|
|
@@ -49,7 +49,7 @@ public class WebSocketServer {
|
|
|
private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
|
// 管理端连接
|
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
+
|
|
|
// Session发送锁,避免同一会话并发发送消息
|
|
|
private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
// 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
|
@@ -58,7 +58,7 @@ public class WebSocketServer {
|
|
|
private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
|
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
+
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
|
@@ -72,6 +72,7 @@ public class WebSocketServer {
|
|
|
private final ILiveUserFirstEntryService liveUserFirstEntryService = SpringUtils.getBean(ILiveUserFirstEntryService.class);
|
|
|
private final ILiveCouponIssueService liveCouponIssueService = SpringUtils.getBean(ILiveCouponIssueService.class);
|
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
|
+ private static Random random = new Random();
|
|
|
|
|
|
// 直播间在线用户缓存
|
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
|
@@ -145,17 +146,19 @@ public class WebSocketServer {
|
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
}
|
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("entry");
|
|
|
- sendMsgVo.setMsg("用户进入");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 广播连接消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ if (1 == random.nextInt(4)) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("entry");
|
|
|
+ sendMsgVo.setMsg("用户进入");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ // 广播连接消息
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
|
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
if (liveUserFirstEntry != null) {
|
|
|
@@ -190,13 +193,12 @@ public class WebSocketServer {
|
|
|
// 为admin房间创建单线程执行器,保证串行化发送
|
|
|
adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 初始化Session锁
|
|
|
sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
// 初始化心跳时间
|
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
|
|
|
- log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
}
|
|
|
|
|
|
//关闭连接时调用
|
|
|
@@ -227,18 +229,22 @@ public class WebSocketServer {
|
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("out");
|
|
|
- sendMsgVo.setMsg("用户离开");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
-
|
|
|
- // 广播离开消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+
|
|
|
+
|
|
|
+ // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
+ if (1 == new Random().nextInt(4)) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("out");
|
|
|
+ sendMsgVo.setMsg("用户离开");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+
|
|
|
} else {
|
|
|
adminRoom.remove(session);
|
|
|
// 如果admin房间为空,关闭并清理执行器
|
|
|
@@ -250,12 +256,10 @@ public class WebSocketServer {
|
|
|
adminRooms.remove(liveId);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 清理Session相关资源
|
|
|
heartbeatCache.remove(session.getId());
|
|
|
sessionLocks.remove(session.getId());
|
|
|
-
|
|
|
- log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@@ -467,7 +471,6 @@ public class WebSocketServer {
|
|
|
* 处理红包变动消息
|
|
|
*/
|
|
|
private void processRed(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("redData: {}", msg);
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
|
@@ -483,7 +486,6 @@ public class WebSocketServer {
|
|
|
* 处理抽奖变动消息
|
|
|
*/
|
|
|
private void processLottery(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("lotteryData: {}", msg);
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
|
@@ -502,12 +504,7 @@ public class WebSocketServer {
|
|
|
try {
|
|
|
this.onClose(session);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("webSocket 错误 onError", e);
|
|
|
- }
|
|
|
- if (throwable instanceof EOFException) {
|
|
|
- log.info("WebSocket连接被客户端正常关闭(EOF),sessionId: {}", session.getId());
|
|
|
- } else {
|
|
|
- log.error("WebSocket连接错误", throwable);
|
|
|
+ log.error("webSocket 错误处理失败", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -532,17 +529,16 @@ public class WebSocketServer {
|
|
|
//发送消息(带锁机制,避免并发发送)
|
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
|
if (session == null || !session.isOpen()) {
|
|
|
- log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 获取Session锁
|
|
|
Lock lock = sessionLocks.get(session.getId());
|
|
|
if (lock == null) {
|
|
|
// 如果锁不存在,创建一个新锁
|
|
|
lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 使用锁保证同一Session的消息串行发送
|
|
|
lock.lock();
|
|
|
try {
|
|
|
@@ -558,7 +554,6 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
Session session = room.get(userId);
|
|
|
if (session == null || !session.isOpen()) {
|
|
|
- log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
return;
|
|
|
}
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
@@ -578,7 +573,6 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
Session session = room.get(userId);
|
|
|
if (session == null || !session.isOpen()) {
|
|
|
- log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -609,7 +603,7 @@ public class WebSocketServer {
|
|
|
sendWithRetry(v,message,1);
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
+
|
|
|
// admin房间:串行发送,使用单线程执行器
|
|
|
if (!adminRoom.isEmpty()) {
|
|
|
ExecutorService executor = adminExecutors.get(liveId);
|
|
|
@@ -649,7 +643,6 @@ public class WebSocketServer {
|
|
|
String valueStr = cacheObject.toString().trim();
|
|
|
current = Integer.parseInt(valueStr);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- log.error("点赞数格式错误,liveId: {}, value: {}", liveId, cacheObject, e);
|
|
|
continue;
|
|
|
}
|
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
|
@@ -684,8 +677,6 @@ public class WebSocketServer {
|
|
|
|
|
|
// 广播当前直播间的在线人数
|
|
|
broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
-
|
|
|
- log.debug("广播直播间在线人数: liveId={}, onlineCount={}", liveId, onlineCount);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -697,66 +688,64 @@ public class WebSocketServer {
|
|
|
public void cleanInactiveSessions() {
|
|
|
long currentTime = System.currentTimeMillis();
|
|
|
int cleanedCount = 0;
|
|
|
-
|
|
|
+
|
|
|
// 遍历所有直播间
|
|
|
for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
|
|
|
Long liveId = roomEntry.getKey();
|
|
|
ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
|
|
|
-
|
|
|
+
|
|
|
// 检查普通用户会话
|
|
|
List<Long> toRemove = new ArrayList<>();
|
|
|
room.forEach((userId, session) -> {
|
|
|
Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
- log.warn("会话心跳超时,即将关闭: sessionId={}, liveId={}, userId={}, 超时时长={}ms",
|
|
|
- session.getId(), liveId, userId, currentTime - lastHeartbeat);
|
|
|
toRemove.add(userId);
|
|
|
try {
|
|
|
if (session.isOpen()) {
|
|
|
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
|
|
|
+ log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
|
|
|
session.getId(), liveId, userId, e);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
+
|
|
|
// 移除超时的会话
|
|
|
toRemove.forEach(room::remove);
|
|
|
cleanedCount += toRemove.size();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 检查admin房间
|
|
|
for (Map.Entry<Long, CopyOnWriteArrayList<Session>> adminEntry : adminRooms.entrySet()) {
|
|
|
Long liveId = adminEntry.getKey();
|
|
|
CopyOnWriteArrayList<Session> adminRoom = adminEntry.getValue();
|
|
|
-
|
|
|
+
|
|
|
List<Session> toRemoveAdmin = new ArrayList<>();
|
|
|
for (Session session : adminRoom) {
|
|
|
Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
- log.warn("admin会话心跳超时,即将关闭: sessionId={}, liveId={}, 超时时长={}ms",
|
|
|
- session.getId(), liveId, currentTime - lastHeartbeat);
|
|
|
toRemoveAdmin.add(session);
|
|
|
try {
|
|
|
if (session.isOpen()) {
|
|
|
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("关闭admin超时会话失败: sessionId={}, liveId={}",
|
|
|
+ log.error("关闭admin超时会话失败: sessionId={}, liveId={}",
|
|
|
session.getId(), liveId, e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 移除超时的admin会话
|
|
|
toRemoveAdmin.forEach(adminRoom::remove);
|
|
|
cleanedCount += toRemoveAdmin.size();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (cleanedCount > 0) {
|
|
|
- log.info("清理无效会话完成,共清理 {} 个超时会话", cleanedCount);
|
|
|
+ if (random.nextInt(10) == 1) {
|
|
|
+ log.info("已清理 {} 个无效会话", cleanedCount);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -777,10 +766,9 @@ public class WebSocketServer {
|
|
|
|
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
|
if (session == null || !session.isOpen()) {
|
|
|
- log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
int attempts = 0;
|
|
|
while (attempts < maxRetries) {
|
|
|
try {
|
|
|
@@ -790,7 +778,6 @@ public class WebSocketServer {
|
|
|
} catch (Exception e) {
|
|
|
if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
|
attempts++;
|
|
|
- log.warn("发送消息遇到TEXT_FULL_WRITING错误,第{}次重试, sessionId={}", attempts, session.getId());
|
|
|
try {
|
|
|
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 100));
|
|
|
} catch (InterruptedException ie) {
|
|
|
@@ -803,7 +790,7 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (attempts >= maxRetries) {
|
|
|
log.warn("超过重试次数({}),放弃发送消息: sessionId={}", maxRetries, session.getId());
|
|
|
}
|