Просмотр исходного кода

feat:websocket发送评论优化

caoliqin 1 день назад
Родитель
Сommit
d6311bed10

+ 7 - 1
fs-websocket/src/main/java/com/fs/websocket/FsWebSocketServiceApplication.java

@@ -4,13 +4,19 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
 
 /**
  * WebSocket服务启动程序
  *
  * @author fs
  */
-@SpringBootApplication(scanBasePackages = {"com.fs"}, exclude= {DataSourceAutoConfiguration.class})
+@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
+@ComponentScan(basePackages = {"com.fs"}, excludeFilters = {
+        @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {com.fs.common.utils.RedisUtil.class,
+                com.fs.common.core.redis.RedisCache.class, com.fs.common.core.redis.RedisCacheT.class})
+        })
 public class FsWebSocketServiceApplication extends SpringBootServletInitializer {
 
     public static void main(String[] args) {

+ 4 - 4
fs-websocket/src/main/java/com/fs/websocket/config/WebSocketConfig.java

@@ -27,13 +27,13 @@ public class WebSocketConfig {
     public ServletServerContainerFactoryBean createWebSocketContainer() {
         ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
         // 设置文本消息缓冲区大小
-        container.setMaxTextMessageBufferSize(10240000);
+        container.setMaxTextMessageBufferSize(1048576);
         // 设置二进制消息缓冲区大小
-        container.setMaxBinaryMessageBufferSize(10240000);
+        container.setMaxBinaryMessageBufferSize(1048576);
         // 设置最大会话空闲超时时间(单位:毫秒)
-        container.setMaxSessionIdleTimeout(20 * 60000L); // 15分钟
+        container.setMaxSessionIdleTimeout(60000L); // 15分钟
         // 设置异步发送超时时间(单位:毫秒)
-        container.setAsyncSendTimeout(300 * 1000L);
+        container.setAsyncSendTimeout(30 * 1000L);
         return container;
     }
 

+ 30 - 15
fs-websocket/src/main/java/com/fs/websocket/service/WebSocketServer.java

@@ -6,15 +6,12 @@ import com.fs.common.utils.StringUtils;
 import com.fs.websocket.bean.SendMsgVO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
-
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 @ServerEndpoint("/app/webSocket/{userId}")
@@ -24,25 +21,41 @@ public class WebSocketServer {
 
     //concurrent包的线程安全,用来存放每个客户端对应的WebSocketServer的会话对象
     private static final ConcurrentHashMap<Long, Session> sessionPools = new ConcurrentHashMap<>();
-//    private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
+
 
     //分发消息
     public void sendMessageToAll(String message) throws IOException {
-        Collection<Session> sessions = sessionPools.values();
-        if(!CollectionUtils.isEmpty(sessions)){
-            for (Session session : sessions) {
-                System.out.println("发送数据:" + message);
-                session.getBasicRemote().sendText(message);
-                log.info("分发消息结束,人数,{},消息内容,{}",  sessionPools.size(), message);
-            }
-        }
+//        Collection<Session> sessions = sessionPools.values();
+//        if(!CollectionUtils.isEmpty(sessions)){
+            sessionPools.forEach((userId, session) -> {
+                if (session.isOpen()) {
+                    try {
+                        // 异步发送,设置超时
+                        Future<Void> future = session.getAsyncRemote().sendText(message);
+                        System.out.println("分发消息,数据内容:" + message);
+                        try {
+                            future.get(10, TimeUnit.SECONDS);
+                        } catch (TimeoutException e) {
+                            // 超时关闭连接
+                            session.close();
+                            log.error("超时关闭连接,并移除用户:{}", userId, e);
+                            sessionPools.remove(userId);
+                        }
+                    } catch (Exception e) {
+                        log.error("分发消息失败,并移除用户: {}", userId, e);
+                        sessionPools.remove(userId);
+                    }
+                }
+            });
+        log.info("分发消息结束,人数,{}",  sessionPools.size());
+//        }
     }
 
     //指定用户发送消息
     public static void sendMessage(Session session, String message) throws IOException {
         if(session != null){
             synchronized (session) {
-                log.info("发送数据:{}", message);
+                log.info("发送心跳数据:{}", message);
                 session.getBasicRemote().sendText(message);
             }
         }
@@ -95,7 +108,9 @@ public class WebSocketServer {
     //错误时调用
     @OnError
     public void onError(Session session, Throwable throwable) {
-        log.error("webSocket连接错误,{}", throwable.getMessage());
+        Map<String, String> params = getParams(session);
+        long userId = Long.parseLong(params.get("userId"));
+        log.error("webSocket连接错误,{},移除用户,{}", throwable.getMessage(), userId);
         throwable.printStackTrace();
     }