|
@@ -135,7 +135,9 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
Live live = liveService.selectLiveByLiveId(liveId);
|
|
Live live = liveService.selectLiveByLiveId(liveId);
|
|
|
if (live == null) {
|
|
if (live == null) {
|
|
|
- throw new BaseException("未找到直播间");
|
|
|
|
|
|
|
+ log.warn("WebSocket连接拒绝: 直播间不存在, liveId={}, userId={}", liveId, userId);
|
|
|
|
|
+ closeSession(session, 4004, "未找到直播间");
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
long companyId = -1L;
|
|
long companyId = -1L;
|
|
|
long companyUserId = -1L;
|
|
long companyUserId = -1L;
|
|
@@ -168,7 +170,9 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if (Objects.isNull(fsUser)) {
|
|
if (Objects.isNull(fsUser)) {
|
|
|
- throw new BaseException("用户信息错误");
|
|
|
|
|
|
|
+ log.warn("WebSocket连接拒绝: 用户信息错误, liveId={}, userId={}", liveId, userId);
|
|
|
|
|
+ closeSession(session, 4003, "用户信息错误");
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
@@ -334,7 +338,9 @@ public class WebSocketServer {
|
|
|
//关闭连接时调用
|
|
//关闭连接时调用
|
|
|
@OnClose
|
|
@OnClose
|
|
|
public void onClose(Session session) {
|
|
public void onClose(Session session) {
|
|
|
|
|
+
|
|
|
Map<String, Object> userProperties = session.getUserProperties();
|
|
Map<String, Object> userProperties = session.getUserProperties();
|
|
|
|
|
+
|
|
|
// 获取公司ID和销售ID
|
|
// 获取公司ID和销售ID
|
|
|
long companyId = -1L;
|
|
long companyId = -1L;
|
|
|
long companyUserId = -1L;
|
|
long companyUserId = -1L;
|
|
@@ -349,10 +355,17 @@ public class WebSocketServer {
|
|
|
long userId = (long) userProperties.get("userId");
|
|
long userId = (long) userProperties.get("userId");
|
|
|
long userType = (long) userProperties.get("userType");
|
|
long userType = (long) userProperties.get("userType");
|
|
|
|
|
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
- List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ log.info("关闭连接: sessionId={}, uri={}, liveId={}, userId={}, userType={}, companyId={}, companyUserId={}",
|
|
|
|
|
+ session.getId(), session.getRequestURI(), liveId, userId, userType, companyId, companyUserId);
|
|
|
|
|
+
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
if (userType == 0) {
|
|
if (userType == 0) {
|
|
|
- // 缓存用户信息,过期时间4小时
|
|
|
|
|
|
|
+ if (room == null || !room.containsKey(userId)) {
|
|
|
|
|
+ log.info("连接未成功建立,跳过清理: liveId={}, userId={}", liveId, userId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
String userCacheKey = "fs:user:" + userId;
|
|
String userCacheKey = "fs:user:" + userId;
|
|
|
FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
|
if (fsUser == null) {
|
|
if (fsUser == null) {
|
|
@@ -362,7 +375,12 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if (Objects.isNull(fsUser)) {
|
|
if (Objects.isNull(fsUser)) {
|
|
|
- throw new BaseException("用户信息错误");
|
|
|
|
|
|
|
+ log.error("用户信息错误: userId={}", userId);
|
|
|
|
|
+ room.remove(userId);
|
|
|
|
|
+ if (room.isEmpty()) {
|
|
|
|
|
+ rooms.remove(liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
// 计算并更新用户在线时长
|
|
// 计算并更新用户在线时长
|
|
|
room.remove(userId);
|
|
room.remove(userId);
|
|
@@ -377,11 +395,9 @@ public class WebSocketServer {
|
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
|
|
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
|
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser, liveId, userId);
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- // 下线消息采样10%进入队列
|
|
|
|
|
- if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
|
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE && liveWatchUserVO != null) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -396,14 +412,15 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
} else {
|
|
|
- adminRoom.remove(session);
|
|
|
|
|
- // 如果admin房间为空,关闭并清理执行器
|
|
|
|
|
- if (adminRoom.isEmpty()) {
|
|
|
|
|
- ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
|
|
- if (executor != null) {
|
|
|
|
|
- executor.shutdown();
|
|
|
|
|
|
|
+ if (adminRoom != null) {
|
|
|
|
|
+ adminRoom.remove(session);
|
|
|
|
|
+ if (adminRoom.isEmpty()) {
|
|
|
|
|
+ ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
|
|
+ if (executor != null) {
|
|
|
|
|
+ executor.shutdown();
|
|
|
|
|
+ }
|
|
|
|
|
+ adminRooms.remove(liveId);
|
|
|
}
|
|
}
|
|
|
- adminRooms.remove(liveId);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -809,10 +826,30 @@ public class WebSocketServer {
|
|
|
@OnError
|
|
@OnError
|
|
|
public void onError(Session session, Throwable throwable) {
|
|
public void onError(Session session, Throwable throwable) {
|
|
|
|
|
|
|
|
|
|
+ log.error("WebSocket发生错误: sessionId={}, error={}",
|
|
|
|
|
+ session != null ? session.getId() : "null",
|
|
|
|
|
+ throwable != null ? throwable.getMessage() : "null", throwable);
|
|
|
|
|
+
|
|
|
|
|
+// try {
|
|
|
|
|
+// this.onClose(session);
|
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
|
+// log.error("webSocket 错误处理失败", e.getMessage());
|
|
|
|
|
+// }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void closeSession(Session session, int code, String reason) {
|
|
|
try {
|
|
try {
|
|
|
- this.onClose(session);
|
|
|
|
|
|
|
+ if (session != null && session.isOpen()) {
|
|
|
|
|
+ CloseReason.CloseCode closeCode;
|
|
|
|
|
+ try {
|
|
|
|
|
+ closeCode = CloseReason.CloseCodes.getCloseCode(code);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ closeCode = CloseReason.CloseCodes.NORMAL_CLOSURE;
|
|
|
|
|
+ }
|
|
|
|
|
+ session.close(new CloseReason(closeCode, reason != null ? reason : ""));
|
|
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("webSocket 错误处理失败", e);
|
|
|
|
|
|
|
+ log.error("关闭WebSocket会话失败: sessionId={}, error={}", session != null ? session.getId() : "null", e.getMessage());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1709,7 +1746,7 @@ public class WebSocketServer {
|
|
|
return priorityCompare;
|
|
return priorityCompare;
|
|
|
}
|
|
}
|
|
|
// 相同优先级按序列号排序(FIFO)
|
|
// 相同优先级按序列号排序(FIFO)
|
|
|
- return Long.compare(this.sequence, other.sequence);
|
|
|
|
|
|
|
+ return java.lang.Long.compare(this.sequence, other.sequence);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|