|
|
@@ -87,6 +87,7 @@ public class WebSocketServer {
|
|
|
private final static long MAX_QUEUE_SIZE_BYTES = 200L * 1024L * 1024L; // 200MB
|
|
|
// 上下线消息采样率:10%
|
|
|
private final static double ENTRY_EXIT_SAMPLE_RATE = 0.1;
|
|
|
+ private static final String CLEANUP_DONE = "wsCleanupDone";
|
|
|
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
@@ -116,10 +117,36 @@ public class WebSocketServer {
|
|
|
//建立连接成功调用
|
|
|
@OnOpen
|
|
|
public void onOpen(Session session) {
|
|
|
-
|
|
|
+ session.getUserProperties().remove(CLEANUP_DONE);
|
|
|
Map<String, Object> userProperties = session.getUserProperties();
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userId = (long) userProperties.get("userId");
|
|
|
+ try {
|
|
|
+ openSession(session, userProperties, liveId, userId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("WebSocket onOpen失败, liveId={}, userId={}, sessionId={}", liveId, userId, session.getId(), e);
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ if (room != null) {
|
|
|
+ Session current = room.get(userId);
|
|
|
+ if (current != null && Objects.equals(current.getId(), session.getId())) {
|
|
|
+ room.remove(userId);
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ rooms.remove(liveId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ heartbeatCache.remove(session.getId());
|
|
|
+ sessionLocks.remove(session.getId());
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接建立失败"));
|
|
|
+ }
|
|
|
+ } catch (IOException ignored) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void openSession(Session session, Map<String, Object> userProperties, long liveId, long userId) {
|
|
|
long userType = (long) userProperties.get("userType");
|
|
|
long qwUserId = -1;
|
|
|
long externalContactId = -1;
|
|
|
@@ -150,88 +177,87 @@ public class WebSocketServer {
|
|
|
|
|
|
// 记录连接信息 管理员不记录
|
|
|
if (userType == 0) {
|
|
|
- // 缓存用户信息,过期时间4小时
|
|
|
- String userCacheKey = "fs:user:" + userId;
|
|
|
- FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
|
- if (fsUser == null) {
|
|
|
- fsUser = fsUserService.selectFsUserById(userId);
|
|
|
- if (fsUser != null) {
|
|
|
- redisCache.setCacheObject(userCacheKey, fsUser, 4, TimeUnit.HOURS);
|
|
|
- }
|
|
|
- }
|
|
|
+ FsUserScrm fsUser = getFsUserWithCache(userId);
|
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
+ Session oldSession = room.get(userId);
|
|
|
+ boolean isReconnect = oldSession != null && !Objects.equals(oldSession.getId(), session.getId());
|
|
|
room.put(userId, session);
|
|
|
+ if (isReconnect) {
|
|
|
+ log.info("用户重连WebSocket, liveId={}, userId={}, oldSessionId={}, newSessionId={}",
|
|
|
+ liveId, userId, oldSession.getId(), session.getId());
|
|
|
+ try {
|
|
|
+ if (oldSession.isOpen()) {
|
|
|
+ oldSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "replaced by new connection"));
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.warn("关闭旧WebSocket连接失败, liveId={}, userId={}", liveId, userId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// 存储用户进入直播间的时间到 Redis(用于计算在线时长)
|
|
|
// 如果已经存在进入时间,说明是重连,不应该覆盖,保持原来的进入时间
|
|
|
String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
- Long existingEntryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
+ Long existingEntryTime = safeGetCacheLong(entryTimeKey);
|
|
|
if (existingEntryTime == null) {
|
|
|
// 首次连接,记录进入时间
|
|
|
redisCache.setCacheObject(entryTimeKey, System.currentTimeMillis(), 24, TimeUnit.HOURS);
|
|
|
}
|
|
|
// 如果是重连,不覆盖进入时间,保持原来的进入时间以便正确计算总时长
|
|
|
|
|
|
- // 直播间浏览量 +1
|
|
|
- redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
-
|
|
|
- // 累计观看人次 +1
|
|
|
- redisCache.incr(TOTAL_VIEWS_KEY + liveId, 1);
|
|
|
-
|
|
|
- // 记录在线人数
|
|
|
- redisCache.incr(ONLINE_USERS_KEY + liveId, 1);
|
|
|
- // 将用户ID添加到在线用户Set中
|
|
|
- String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
- redisCache.redisTemplate.opsForSet().add(onlineUsersSetKey, String.valueOf(userId));
|
|
|
- // 获取Set的大小作为当前在线人数
|
|
|
- Long currentOnlineCount = redisCache.redisTemplate.opsForSet().size(onlineUsersSetKey);
|
|
|
- //最大同时在线人数 - 使用Set大小来判断
|
|
|
- Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
|
|
|
- int currentOnline = currentOnlineCount != null ? currentOnlineCount.intValue() : 0;
|
|
|
- if (maxOnline == null || currentOnline > maxOnline) {
|
|
|
- redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
|
|
|
- }
|
|
|
-
|
|
|
- // 判断是否是该直播间的首次访客(独立访客统计)
|
|
|
- boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
- if (isFirstVisit) {
|
|
|
- redisCache.incr(UNIQUE_VISITORS_KEY + liveId, 1);
|
|
|
- }
|
|
|
+ if (!isReconnect) {
|
|
|
+ // 直播间浏览量 +1
|
|
|
+ redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
+
|
|
|
+ // 累计观看人次 +1
|
|
|
+ redisCache.incr(TOTAL_VIEWS_KEY + liveId, 1);
|
|
|
+
|
|
|
+ // 记录在线人数
|
|
|
+ redisCache.incr(ONLINE_USERS_KEY + liveId, 1);
|
|
|
+ // 将用户ID添加到在线用户Set中
|
|
|
+ String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
+ redisCache.redisTemplate.opsForSet().add(onlineUsersSetKey, String.valueOf(userId));
|
|
|
+ // 获取Set的大小作为当前在线人数
|
|
|
+ Long currentOnlineCount = redisCache.redisTemplate.opsForSet().size(onlineUsersSetKey);
|
|
|
+ //最大同时在线人数 - 使用Set大小来判断
|
|
|
+ Integer maxOnline = safeGetCacheInteger(MAX_ONLINE_USERS_KEY + liveId);
|
|
|
+ int currentOnline = currentOnlineCount != null ? currentOnlineCount.intValue() : 0;
|
|
|
+ if (maxOnline == null || currentOnline > maxOnline) {
|
|
|
+ redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
|
|
|
+ }
|
|
|
|
|
|
- // 判断是否是首次进入直播间的观众
|
|
|
- boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
- if (isFirstViewer) {
|
|
|
- redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
- }
|
|
|
- liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- // 上线消息采样10%进入队列
|
|
|
- if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("entry");
|
|
|
- sendMsgVo.setMsg("用户进入");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 将上线消息加入队列
|
|
|
- enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
- }
|
|
|
+ // 判断是否是该直播间的首次访客(独立访客统计)
|
|
|
+ boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
+ if (isFirstVisit) {
|
|
|
+ redisCache.incr(UNIQUE_VISITORS_KEY + liveId, 1);
|
|
|
+ }
|
|
|
|
|
|
- // 缓存用户首次进入记录,过期时间4小时
|
|
|
- String liveUserFirstEntryCacheKey = "live:userFirstEntry:" + liveId + ":" + userId;
|
|
|
- LiveUserFirstEntry liveUserFirstEntry = redisCache.getCacheObject(liveUserFirstEntryCacheKey);
|
|
|
- if (liveUserFirstEntry == null) {
|
|
|
- liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
- if (liveUserFirstEntry != null) {
|
|
|
- redisCache.setCacheObject(liveUserFirstEntryCacheKey, liveUserFirstEntry, 4, TimeUnit.HOURS);
|
|
|
+ // 判断是否是首次进入直播间的观众
|
|
|
+ boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
+ if (isFirstViewer) {
|
|
|
+ redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
+ }
|
|
|
+ liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
+ // 上线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("entry");
|
|
|
+ sendMsgVo.setMsg("用户进入");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ // 将上线消息加入队列
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ LiveUserFirstEntry liveUserFirstEntry = getLiveUserFirstEntryWithCache(liveId, userId);
|
|
|
// 如果用户连上了 socket,并且公司ID和销售ID大于0,更新 LiveWatchLog 的 logType
|
|
|
|
|
|
if ((qwUserId > 0 && externalContactId > 0) || (liveUserFirstEntry != null && liveUserFirstEntry.getCompanyId() > 0 && liveUserFirstEntry.getCompanyUserId() > 0 )) {
|
|
|
@@ -262,15 +288,7 @@ public class WebSocketServer {
|
|
|
} else {
|
|
|
// 这个用户A邀请用户b,b的业绩算a的销售的
|
|
|
if (companyId == -2L) {
|
|
|
- // 缓存用户首次进入记录,过期时间4小时
|
|
|
- String clientBCacheKey = "live:userFirstEntry:" + liveId + ":" + companyUserId;
|
|
|
- LiveUserFirstEntry clientB = redisCache.getCacheObject(clientBCacheKey);
|
|
|
- if (clientB == null) {
|
|
|
- clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
|
|
|
- if (clientB != null) {
|
|
|
- redisCache.setCacheObject(clientBCacheKey, clientB, 4, TimeUnit.HOURS);
|
|
|
- }
|
|
|
- }
|
|
|
+ LiveUserFirstEntry clientB = getLiveUserFirstEntryWithCache(liveId, companyUserId);
|
|
|
if (clientB != null) {
|
|
|
companyId = clientB.getCompanyId();
|
|
|
companyUserId = clientB.getCompanyUserId();
|
|
|
@@ -324,98 +342,113 @@ public class WebSocketServer {
|
|
|
//关闭连接时调用
|
|
|
@OnClose
|
|
|
public void onClose(Session session) {
|
|
|
- Map<String, Object> userProperties = session.getUserProperties();
|
|
|
- // 获取公司ID和销售ID
|
|
|
- long companyId = -1L;
|
|
|
- long companyUserId = -1L;
|
|
|
- if (!Objects.isNull(userProperties.get("companyId"))) {
|
|
|
- companyId = (long) userProperties.get("companyId");
|
|
|
- }
|
|
|
- if (!Objects.isNull(userProperties.get("companyUserId"))) {
|
|
|
- companyUserId = (long) userProperties.get("companyUserId");
|
|
|
+ if (Boolean.TRUE.equals(session.getUserProperties().get(CLEANUP_DONE))) {
|
|
|
+ return;
|
|
|
}
|
|
|
+ session.getUserProperties().put(CLEANUP_DONE, Boolean.TRUE);
|
|
|
|
|
|
- long liveId = (long) userProperties.get("liveId");
|
|
|
- long userId = (long) userProperties.get("userId");
|
|
|
- long userType = (long) userProperties.get("userType");
|
|
|
-
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
- List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
- if (userType == 0) {
|
|
|
- // 缓存用户信息,过期时间4小时
|
|
|
- String userCacheKey = "fs:user:" + userId;
|
|
|
- FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
|
- if (fsUser == null) {
|
|
|
- fsUser = fsUserService.selectFsUserById(userId);
|
|
|
- if (fsUser != null) {
|
|
|
- redisCache.setCacheObject(userCacheKey, fsUser, 4, TimeUnit.HOURS);
|
|
|
- }
|
|
|
- }
|
|
|
- if (Objects.isNull(fsUser)) {
|
|
|
- throw new BaseException("用户信息错误");
|
|
|
- }
|
|
|
- // 计算并更新用户在线时长
|
|
|
- room.remove(userId);
|
|
|
- if (room.isEmpty()) {
|
|
|
- rooms.remove(liveId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+ if (userProperties.get("liveId") == null || userProperties.get("userId") == null) {
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
+ long liveId = (long) userProperties.get("liveId");
|
|
|
+ long userId = (long) userProperties.get("userId");
|
|
|
+ long userType = userProperties.get("userType") != null ? (long) userProperties.get("userType") : 0L;
|
|
|
+
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
+ if (userType == 0) {
|
|
|
+ Session currentSession = room.get(userId);
|
|
|
+ if (currentSession != null && !Objects.equals(currentSession.getId(), session.getId())) {
|
|
|
+ log.debug("旧WebSocket连接关闭, 跳过房间清理, liveId={}, userId={}, sessionId={}",
|
|
|
+ liveId, userId, session.getId());
|
|
|
+ } else {
|
|
|
+ room.remove(userId);
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ rooms.remove(liveId);
|
|
|
+ }
|
|
|
|
|
|
- // 直播间在线人数 -1
|
|
|
- redisCache.incr(ONLINE_USERS_KEY + liveId, -1);
|
|
|
- // 从在线用户Set中移除用户ID
|
|
|
- String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
- redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
-
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
-
|
|
|
-
|
|
|
- // 下线消息采样10%进入队列
|
|
|
- if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("out");
|
|
|
- sendMsgVo.setMsg("用户离开");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 将下线消息加入队列
|
|
|
- enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
- }
|
|
|
+ FsUserScrm fsUser = getFsUserWithCache(userId);
|
|
|
+ if (Objects.isNull(fsUser)) {
|
|
|
+ log.warn("WebSocket关闭时未找到用户信息, liveId={}, userId={}", liveId, userId);
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ // 直播间在线人数 -1
|
|
|
+ redisCache.incr(ONLINE_USERS_KEY + liveId, -1);
|
|
|
+ // 从在线用户Set中移除用户ID
|
|
|
+ String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
+ redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
+
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser, liveId, userId);
|
|
|
+
|
|
|
+ // 下线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("out");
|
|
|
+ sendMsgVo.setMsg("用户离开");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("WebSocket下线处理失败, liveId={}, userId={}", liveId, userId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- } else {
|
|
|
- adminRoom.remove(session);
|
|
|
- // 如果admin房间为空,关闭并清理执行器
|
|
|
- if (adminRoom.isEmpty()) {
|
|
|
- ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
- if (executor != null) {
|
|
|
- executor.shutdown();
|
|
|
+ } else {
|
|
|
+ adminRoom.remove(session);
|
|
|
+ if (adminRoom.isEmpty()) {
|
|
|
+ ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
+ if (executor != null) {
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+ adminRooms.remove(liveId);
|
|
|
}
|
|
|
- adminRooms.remove(liveId);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- // 清理Session相关资源
|
|
|
- heartbeatCache.remove(session.getId());
|
|
|
- sessionLocks.remove(session.getId());
|
|
|
-
|
|
|
- // 检查并清理空的直播间资源
|
|
|
- cleanupEmptyRoom(liveId);
|
|
|
+ heartbeatCache.remove(session.getId());
|
|
|
+ sessionLocks.remove(session.getId());
|
|
|
+ cleanupEmptyRoom(liveId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("WebSocket onClose处理异常, sessionId={}", session.getId(), e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@OnMessage
|
|
|
- public void onMessage(Session session,String message) throws IOException {
|
|
|
+ public void onMessage(Session session, String message) {
|
|
|
+ if (!session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+ if (userProperties.get("liveId") == null || userProperties.get("userId") == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
- long userType = (long) userProperties.get("userType");
|
|
|
- boolean isAdmin = false;
|
|
|
+ long userId = (long) userProperties.get("userId");
|
|
|
+ long userType = userProperties.get("userType") != null ? (long) userProperties.get("userType") : 0L;
|
|
|
+
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
+ ensureSessionInRoom(session, liveId, userId, userType);
|
|
|
|
|
|
- SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
- if(msg.isOn()) return;
|
|
|
+ SendMsgVo msg = parseIncomingMessage(message, liveId, userId, userType);
|
|
|
+ if (msg == null || msg.isOn()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(msg.getCmd())) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isAdmin = false;
|
|
|
LiveMsg liveMsg;
|
|
|
try {
|
|
|
switch (msg.getCmd()) {
|
|
|
@@ -734,12 +767,159 @@ public class WebSocketServer {
|
|
|
//错误时调用
|
|
|
@OnError
|
|
|
public void onError(Session session, Throwable throwable) {
|
|
|
+ if (throwable instanceof EOFException) {
|
|
|
+ log.debug("WebSocket连接关闭: sessionId={}", session.getId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.error("WebSocket异常, sessionId={}, error={}", session.getId(), throwable.getMessage(), throwable);
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- this.onClose(session);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("webSocket 错误处理失败", e);
|
|
|
+ private SendMsgVo parseIncomingMessage(String raw, long liveId, long userId, long userType) {
|
|
|
+ if (StringUtils.isEmpty(raw)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String text = raw.trim();
|
|
|
+ if (text.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (text.startsWith("{")) {
|
|
|
+ try {
|
|
|
+ SendMsgVo msg = JSONObject.parseObject(text, SendMsgVo.class);
|
|
|
+ if (msg != null) {
|
|
|
+ fillMessageDefaults(msg, liveId, userId, userType);
|
|
|
+ if (StringUtils.isEmpty(msg.getCmd()) && StringUtils.isNotEmpty(msg.getMsg())) {
|
|
|
+ msg.setCmd("sendMsg");
|
|
|
+ }
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("WebSocket JSON解析失败, 按纯文本处理, message={}", text);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ return buildPlainTextMessage(text, liveId, userId, userType);
|
|
|
+ }
|
|
|
+
|
|
|
+ private SendMsgVo buildPlainTextMessage(String text, long liveId, long userId, long userType) {
|
|
|
+ SendMsgVo msg = new SendMsgVo();
|
|
|
+ msg.setLiveId(liveId);
|
|
|
+ msg.setUserId(userId);
|
|
|
+ msg.setUserType(userType);
|
|
|
+ if ("heartbeat".equalsIgnoreCase(text)) {
|
|
|
+ msg.setCmd("heartbeat");
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+ msg.setCmd("sendMsg");
|
|
|
+ msg.setMsg(text);
|
|
|
+ fillMessageDefaults(msg, liveId, userId, userType);
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void fillMessageDefaults(SendMsgVo msg, long liveId, long userId, long userType) {
|
|
|
+ if (msg.getLiveId() == null) {
|
|
|
+ msg.setLiveId(liveId);
|
|
|
+ }
|
|
|
+ if (msg.getUserId() == null) {
|
|
|
+ msg.setUserId(userId);
|
|
|
+ }
|
|
|
+ if (msg.getUserType() == null) {
|
|
|
+ msg.setUserType(userType);
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(msg.getNickName()) || StringUtils.isEmpty(msg.getAvatar())) {
|
|
|
+ FsUserScrm user = getFsUserWithCache(userId);
|
|
|
+ if (user != null) {
|
|
|
+ if (StringUtils.isEmpty(msg.getNickName())) {
|
|
|
+ msg.setNickName(user.getNickname());
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(msg.getAvatar())) {
|
|
|
+ msg.setAvatar(user.getAvatar());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private FsUserScrm getFsUserWithCache(long userId) {
|
|
|
+ String userCacheKey = "fs:user:" + userId;
|
|
|
+ Object cached = redisCache.getCacheObject(userCacheKey);
|
|
|
+ FsUserScrm fsUser = RedisCache.convertCacheObject(cached, FsUserScrm.class);
|
|
|
+ if (fsUser == null) {
|
|
|
+ fsUser = fsUserService.selectFsUserById(userId);
|
|
|
+ if (fsUser != null) {
|
|
|
+ redisCache.setCacheObject(userCacheKey, fsUser, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return fsUser;
|
|
|
+ }
|
|
|
+
|
|
|
+ private LiveUserFirstEntry getLiveUserFirstEntryWithCache(long liveId, long userId) {
|
|
|
+ String cacheKey = "live:userFirstEntry:" + liveId + ":" + userId;
|
|
|
+ Object cached = redisCache.getCacheObject(cacheKey);
|
|
|
+ LiveUserFirstEntry entry = RedisCache.convertCacheObject(cached, LiveUserFirstEntry.class);
|
|
|
+ if (entry == null) {
|
|
|
+ entry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
+ if (entry != null) {
|
|
|
+ redisCache.setCacheObject(cacheKey, entry, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return entry;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Long safeGetCacheLong(String key) {
|
|
|
+ Object value = redisCache.getCacheObject(key);
|
|
|
+ if (value == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (value instanceof Number) {
|
|
|
+ return ((Number) value).longValue();
|
|
|
+ }
|
|
|
+ if (value instanceof String) {
|
|
|
+ try {
|
|
|
+ return Long.parseLong(((String) value).trim());
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer safeGetCacheInteger(String key) {
|
|
|
+ Object value = redisCache.getCacheObject(key);
|
|
|
+ if (value == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (value instanceof Number) {
|
|
|
+ return ((Number) value).intValue();
|
|
|
+ }
|
|
|
+ if (value instanceof String) {
|
|
|
+ try {
|
|
|
+ return Integer.parseInt(((String) value).trim());
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void ensureSessionInRoom(Session session, long liveId, long userId, long userType) {
|
|
|
+ if (userType != 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session currentSession = room.get(userId);
|
|
|
+ if (currentSession != null && Objects.equals(currentSession.getId(), session.getId())) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Session oldSession = room.put(userId, session);
|
|
|
+ if (oldSession != null && !Objects.equals(oldSession.getId(), session.getId()) && oldSession.isOpen()) {
|
|
|
+ try {
|
|
|
+ oldSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "replaced by active connection"));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.warn("关闭被替换的WebSocket连接失败, liveId={}, userId={}", liveId, userId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ log.info("WebSocket会话重新注册到房间, liveId={}, userId={}, sessionId={}", liveId, userId, session.getId());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -760,25 +940,20 @@ public class WebSocketServer {
|
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
}
|
|
|
|
|
|
- //发送消息(带锁机制,避免并发发送)
|
|
|
- public void sendMessage(Session session, String message) throws IOException {
|
|
|
+ //发送消息(带锁机制,避免并发发送;使用同步发送,避免在 onMessage 线程中异步发送导致断连)
|
|
|
+ public void sendMessage(Session session, String message) {
|
|
|
if (session == null || !session.isOpen()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // 获取Session锁
|
|
|
- Lock lock = sessionLocks.get(session.getId());
|
|
|
- if (lock == null) {
|
|
|
- // 如果锁不存在,创建一个新锁
|
|
|
- lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
- }
|
|
|
-
|
|
|
- // 使用锁保证同一Session的消息串行发送
|
|
|
+ Lock lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
lock.lock();
|
|
|
try {
|
|
|
if (session.isOpen()) {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
+ session.getBasicRemote().sendText(message);
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("发送WebSocket消息失败: sessionId={}, error={}", session.getId(), e.getMessage());
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
@@ -795,16 +970,11 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
sendMsgVo.setUserType(0L);
|
|
|
sendMsgVo.setCmd("Integral");
|
|
|
- sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
|
+ sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "积分");
|
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
|
|
|
- if(Objects.isNull( session)) return;
|
|
|
- // 使用带锁的sendMessage方法,保证线程安全
|
|
|
- try {
|
|
|
- sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("发送积分消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
|
|
|
- }
|
|
|
+ if(Objects.isNull(session)) return;
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -818,8 +988,8 @@ public class WebSocketServer {
|
|
|
}
|
|
|
try {
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
- } catch (IOException e) {
|
|
|
- log.error(e.getMessage());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("发送完课积分消息失败: liveId={}, userId={}", liveId, userId, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -839,13 +1009,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setMsg("账号已被停用");
|
|
|
sendMsgVo.setData(null);
|
|
|
|
|
|
- if(Objects.isNull( session)) return;
|
|
|
- // 使用带锁的sendMessage方法,保证线程安全
|
|
|
- try {
|
|
|
- sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("发送封禁消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
|
|
|
- }
|
|
|
+ if(Objects.isNull(session)) return;
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -989,20 +1154,20 @@ public class WebSocketServer {
|
|
|
}
|
|
|
|
|
|
// 检查普通用户会话(使用快照遍历,避免并发修改异常)
|
|
|
- List<Long> toRemove = new ArrayList<>();
|
|
|
+ Map<Long, Session> toRemove = new HashMap<>();
|
|
|
// 创建快照,避免在遍历过程中修改原集合
|
|
|
for (Map.Entry<Long, Session> userEntry : room.entrySet()) {
|
|
|
Long userId = userEntry.getKey();
|
|
|
Session session = userEntry.getValue();
|
|
|
|
|
|
if (session == null) {
|
|
|
- toRemove.add(userId);
|
|
|
+ toRemove.put(userId, null);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
- toRemove.add(userId);
|
|
|
+ toRemove.put(userId, session);
|
|
|
try {
|
|
|
if (session.isOpen()) {
|
|
|
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
@@ -1018,10 +1183,16 @@ public class WebSocketServer {
|
|
|
// 移除超时的会话
|
|
|
if (!toRemove.isEmpty()) {
|
|
|
String hashKey = String.format(LiveKeysConstant.LIVE_WATCH_USERS, liveId);
|
|
|
- for (Long userId : toRemove) {
|
|
|
- room.remove(userId);
|
|
|
- // 从 Redis hash 中删除无效用户
|
|
|
- redisCache.hashDelete(hashKey, String.valueOf(userId));
|
|
|
+ for (Map.Entry<Long, Session> entry : toRemove.entrySet()) {
|
|
|
+ Long userId = entry.getKey();
|
|
|
+ Session timeoutSession = entry.getValue();
|
|
|
+ Session roomSession = room.get(userId);
|
|
|
+ if (timeoutSession == null || (roomSession != null
|
|
|
+ && Objects.equals(roomSession.getId(), timeoutSession.getId()))) {
|
|
|
+ room.remove(userId);
|
|
|
+ // 从 Redis hash 中删除无效用户
|
|
|
+ redisCache.hashDelete(hashKey, String.valueOf(userId));
|
|
|
+ }
|
|
|
}
|
|
|
cleanedCount += toRemove.size();
|
|
|
}
|
|
|
@@ -1881,9 +2052,12 @@ public class WebSocketServer {
|
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
|
|
|
|
if (!hasSession) {
|
|
|
- // 停止消费者线程
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = messageQueues.get(liveId);
|
|
|
+ if (queue != null && !queue.isEmpty()) {
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
stopConsumerThread(liveId);
|
|
|
- // 清理消息队列
|
|
|
messageQueues.remove(liveId);
|
|
|
consumerRunningFlags.remove(liveId);
|
|
|
queueSizes.remove(liveId);
|