Просмотр исходного кода

Merge remote-tracking branch 'origin/企微聊天' into 企微聊天

yh 1 неделя назад
Родитель
Сommit
d49b68fea2

+ 74 - 41
fs-qw-api-msg/src/main/java/com/fs/app/socket/QwImSocket.java

@@ -6,25 +6,19 @@ import com.fs.qw.vo.QwMessageListVO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
+import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 @Slf4j
 @ServerEndpoint(value = "/qwImSocket/{companyUserId}", configurator = QwImConfigurator.class)
 @Component
 public class QwImSocket {
 
-    private static final ConcurrentHashMap<Long, CopyOnWriteArraySet<Session>> companyUserSessions = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<Long, Set<Session>> companyUserSessions = new ConcurrentHashMap<>();
 
     /**
      * 连接建立成功调用的方法
@@ -33,8 +27,33 @@ public class QwImSocket {
      */
     @OnOpen
     public void onOpen(Session session, @PathParam("companyUserId") Long companyUserId) {
+        session.setMaxIdleTimeout(300000);
+        session.getAsyncRemote().setSendTimeout(30000);
+
         // 将当前会话加入到会话池中
-        companyUserSessions.computeIfAbsent(companyUserId, k -> new CopyOnWriteArraySet<>()).add(session);
+        // 限制单用户连接数
+        Set<Session> sessions = companyUserSessions.computeIfAbsent(
+                companyUserId,
+                k -> ConcurrentHashMap.newKeySet()
+        );
+
+        if (sessions.size() >= 10) {
+            log.warn("用户 {} 连接数超限: {}", companyUserId, sessions.size());
+            // 移除最旧的连接
+            sessions.stream().findFirst().ifPresent(oldSession -> {
+                try {
+                    oldSession.close(new CloseReason(
+                            CloseReason.CloseCodes.TRY_AGAIN_LATER,
+                            "连接数超限"
+                    ));
+                } catch (Exception e) {
+                    log.error("关闭旧连接失败", e);
+                }
+                sessions.remove(oldSession);
+            });
+        }
+
+        sessions.add(session);
     }
 
     /**
@@ -45,14 +64,7 @@ public class QwImSocket {
     @OnClose
     public void onClose(Session session, @PathParam("companyUserId") Long companyUserId) {
         // 从会话池中移除当前会话
-        CopyOnWriteArraySet<Session> sessions = companyUserSessions.get(companyUserId);
-        if (sessions != null) {
-            sessions.remove(session);
-            // 如果直播间没人了,可以移除该直播间
-            if (sessions.isEmpty()) {
-                companyUserSessions.remove(companyUserId);
-            }
-        }
+        removeSession(companyUserId, session);
     }
 
     /**
@@ -64,14 +76,16 @@ public class QwImSocket {
     @OnError
     public void onError(Session session, @PathParam("companyUserId") Long companyUserId, Throwable error) {
         log.error("发生错误!会话ID: {}", session.getId());
-        CopyOnWriteArraySet<Session> sessions = companyUserSessions.get(companyUserId);
-        if (sessions != null) {
+        removeSession(companyUserId, session);
+    }
+
+    private void removeSession(Long companyUserId, Session session) {
+        if (companyUserId == null || session == null) return;
+        companyUserSessions.computeIfPresent(companyUserId, (k, sessions) -> {
             sessions.remove(session);
-            // 如果直播间没人了,可以移除该直播间
-            if (sessions.isEmpty()) {
-                companyUserSessions.remove(companyUserId);
-            }
-        }
+            log.debug("移除会话,剩余连接数: {}", sessions.size());
+            return sessions.isEmpty() ? null : sessions;
+        });
     }
 
     /**
@@ -84,25 +98,44 @@ public class QwImSocket {
         }
 
         String msg = JSON.toJSONString(message);
-        CopyOnWriteArraySet<Session> sessions = companyUserSessions.get(message.getCompanyUserId());
-        if (sessions != null) {
-            List<Session> closeSession = new ArrayList<>();
-            for (Session session : sessions) {
-                if (!session.isOpen()) {
-                    closeSession.add(session);
-                    continue;
-                }
+        Set<Session> sessions = companyUserSessions.get(message.getCompanyUserId());
 
-                try {
-                    session.getBasicRemote().sendText(msg);
-                } catch (IOException e) {
-                    log.error("发送消息给会话[{}]失败: {}", session.getId(), e.getMessage());
-                    // 移除无效会话
-                    closeSession.add(session);
-                }
+        if (sessions == null || sessions.isEmpty()) {
+            return;
+        }
+
+        sessions.removeIf(session -> !session.isOpen());
+        for (Session session : sessions) {
+            if (session.isOpen()) {
+                session.getAsyncRemote().sendText(msg, result -> {
+                    if (!result.isOK()) {
+                        log.warn("发送失败: {}", result.getException().getMessage());
+                    }
+                });
             }
-            closeSession.forEach(sessions::remove);
         }
     }
 
+    /**
+     * 定时检查连接是否存活
+     */
+    public static void clean() {
+        companyUserSessions.entrySet().removeIf(entry -> {
+            Set<Session> sessions = entry.getValue();
+            sessions.removeIf(session -> !session.isOpen());
+            return sessions.isEmpty();
+        });
+    }
+
+    /**
+     * 获取统计信息
+     */
+    public static String getStats() {
+        int totalSessions = companyUserSessions.values().stream()
+                .mapToInt(Set::size)
+                .sum();
+        return String.format("用户数: %d, 总连接数: %d",
+                companyUserSessions.size(), totalSessions);
+    }
+
 }

+ 18 - 0
fs-qw-api-msg/src/main/java/com/fs/app/socket/task/QwImSessionCleaner.java

@@ -0,0 +1,18 @@
+package com.fs.app.socket.task;
+
+import com.fs.app.socket.QwImSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class QwImSessionCleaner {
+
+    @Scheduled(fixedDelay = 60000)
+    public void clean() {
+        log.info("定时清理无效会话");
+        QwImSocket.clean();
+        log.info("当前会话数: {}", QwImSocket.getStats());
+    }
+}