|
|
@@ -22,6 +22,7 @@ import com.fs.live.service.*;
|
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
@@ -30,10 +31,9 @@ import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
-import java.util.concurrent.ThreadLocalRandom;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
|
|
|
@@ -49,6 +49,16 @@ public class WebSocketServer {
|
|
|
private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
|
// 管理端连接
|
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // Session发送锁,避免同一会话并发发送消息
|
|
|
+ private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
+ // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
|
+ private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
|
|
|
+ // 心跳超时时间(毫秒):3分钟无心跳则认为超时
|
|
|
+ private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
|
|
|
+ // admin房间消息发送线程池(单线程,保证串行化)
|
|
|
+ private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
|
@@ -101,7 +111,7 @@ public class WebSocketServer {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId, location);
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
room.put(userId, session);
|
|
|
// 直播间浏览量 +1
|
|
|
redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
@@ -177,7 +187,14 @@ public class WebSocketServer {
|
|
|
|
|
|
} else {
|
|
|
adminRoom.add(session);
|
|
|
+ // 为admin房间创建单线程执行器,保证串行化发送
|
|
|
+ adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
|
}
|
|
|
+
|
|
|
+ // 初始化Session锁
|
|
|
+ sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
+ // 初始化心跳时间
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
|
|
|
log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
}
|
|
|
@@ -209,7 +226,7 @@ public class WebSocketServer {
|
|
|
// 从在线用户Set中移除用户ID
|
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -224,7 +241,19 @@ public class WebSocketServer {
|
|
|
broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
} else {
|
|
|
adminRoom.remove(session);
|
|
|
+ // 如果admin房间为空,关闭并清理执行器
|
|
|
+ if (adminRoom.isEmpty()) {
|
|
|
+ ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
+ if (executor != null) {
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+ adminRooms.remove(liveId);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ // 清理Session相关资源
|
|
|
+ heartbeatCache.remove(session.getId());
|
|
|
+ sessionLocks.remove(session.getId());
|
|
|
|
|
|
log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
}
|
|
|
@@ -243,6 +272,8 @@ public class WebSocketServer {
|
|
|
try {
|
|
|
switch (msg.getCmd()) {
|
|
|
case "heartbeat":
|
|
|
+ // 更新心跳时间
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
|
case "sendMsg":
|
|
|
@@ -498,12 +529,38 @@ public class WebSocketServer {
|
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
}
|
|
|
|
|
|
- //发送消息
|
|
|
+ //发送消息(带锁机制,避免并发发送)
|
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取Session锁
|
|
|
+ Lock lock = sessionLocks.get(session.getId());
|
|
|
+ if (lock == null) {
|
|
|
+ // 如果锁不存在,创建一个新锁
|
|
|
+ lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用锁保证同一Session的消息串行发送
|
|
|
+ lock.lock();
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.getAsyncRemote().sendText(message);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
+ return;
|
|
|
+ }
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -511,13 +568,20 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("Integral");
|
|
|
sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
- Session session = room.get(userId);
|
|
|
+
|
|
|
if(Objects.isNull( session)) return;
|
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
|
|
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
|
+
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -525,8 +589,7 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("blockUser");
|
|
|
sendMsgVo.setMsg("账号已被停用");
|
|
|
sendMsgVo.setData(null);
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
- Session session = room.get(userId);
|
|
|
+
|
|
|
if(Objects.isNull( session)) return;
|
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
|
@@ -540,16 +603,33 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
|
|
|
+ // 普通用户房间:并行发送
|
|
|
room.forEach((k, v) -> {
|
|
|
if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
+ sendWithRetry(v,message,1);
|
|
|
}
|
|
|
});
|
|
|
- adminRoom.forEach(v -> {
|
|
|
- if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
+
|
|
|
+ // admin房间:串行发送,使用单线程执行器
|
|
|
+ if (!adminRoom.isEmpty()) {
|
|
|
+ ExecutorService executor = adminExecutors.get(liveId);
|
|
|
+ if (executor != null && !executor.isShutdown()) {
|
|
|
+ executor.submit(() -> {
|
|
|
+ for (Session session : adminRoom) {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ // 如果执行器不存在或已关闭,直接发送
|
|
|
+ adminRoom.forEach(v -> {
|
|
|
+ if (v.isOpen()) {
|
|
|
+ sendWithRetry(v, message, 1);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void removeLikeCountCache(Long liveId) {
|
|
|
@@ -586,6 +666,101 @@ public class WebSocketServer {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ @Scheduled(fixedRate = 2000)// 每2秒执行一次
|
|
|
+ public void broadcastUserNumMessage() {
|
|
|
+ // 遍历每个直播间
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> entry : rooms.entrySet()) {
|
|
|
+ Long liveId = entry.getKey();
|
|
|
+ ConcurrentHashMap<Long, Session> room = entry.getValue();
|
|
|
+
|
|
|
+ // 统计当前直播间的在线人数
|
|
|
+ int onlineCount = room.size();
|
|
|
+
|
|
|
+ // 构造消息
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setCmd("userCount");
|
|
|
+ sendMsgVo.setData(String.valueOf(onlineCount));
|
|
|
+
|
|
|
+ // 广播当前直播间的在线人数
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+
|
|
|
+ log.debug("广播直播间在线人数: liveId={}, onlineCount={}", liveId, onlineCount);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时清理无效会话(每分钟执行一次)
|
|
|
+ * 检查心跳超时的会话并关闭
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
+ public void cleanInactiveSessions() {
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ int cleanedCount = 0;
|
|
|
+
|
|
|
+ // 遍历所有直播间
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
|
|
|
+ Long liveId = roomEntry.getKey();
|
|
|
+ ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
|
|
|
+
|
|
|
+ // 检查普通用户会话
|
|
|
+ List<Long> toRemove = new ArrayList<>();
|
|
|
+ room.forEach((userId, session) -> {
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
+ log.warn("会话心跳超时,即将关闭: sessionId={}, liveId={}, userId={}, 超时时长={}ms",
|
|
|
+ session.getId(), liveId, userId, currentTime - lastHeartbeat);
|
|
|
+ toRemove.add(userId);
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
|
|
|
+ session.getId(), liveId, userId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // 移除超时的会话
|
|
|
+ toRemove.forEach(room::remove);
|
|
|
+ cleanedCount += toRemove.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查admin房间
|
|
|
+ for (Map.Entry<Long, CopyOnWriteArrayList<Session>> adminEntry : adminRooms.entrySet()) {
|
|
|
+ Long liveId = adminEntry.getKey();
|
|
|
+ CopyOnWriteArrayList<Session> adminRoom = adminEntry.getValue();
|
|
|
+
|
|
|
+ List<Session> toRemoveAdmin = new ArrayList<>();
|
|
|
+ for (Session session : adminRoom) {
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
+ log.warn("admin会话心跳超时,即将关闭: sessionId={}, liveId={}, 超时时长={}ms",
|
|
|
+ session.getId(), liveId, currentTime - lastHeartbeat);
|
|
|
+ toRemoveAdmin.add(session);
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("关闭admin超时会话失败: sessionId={}, liveId={}",
|
|
|
+ session.getId(), liveId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除超时的admin会话
|
|
|
+ toRemoveAdmin.forEach(adminRoom::remove);
|
|
|
+ cleanedCount += toRemoveAdmin.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cleanedCount > 0) {
|
|
|
+ log.info("清理无效会话完成,共清理 {} 个超时会话", cleanedCount);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* 广播点赞消息
|
|
|
* @param liveId 直播间ID
|
|
|
@@ -601,16 +776,21 @@ public class WebSocketServer {
|
|
|
}
|
|
|
|
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ log.warn("WebSocket 会话已关闭,跳过发送");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
int attempts = 0;
|
|
|
while (attempts < maxRetries) {
|
|
|
try {
|
|
|
- if(session.isOpen()) {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
- }
|
|
|
+ // 使用带锁的sendMessage方法,避免并发发送
|
|
|
+ sendMessage(session, message);
|
|
|
return; // 发送成功,退出
|
|
|
} catch (Exception e) {
|
|
|
if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
|
attempts++;
|
|
|
+ log.warn("发送消息遇到TEXT_FULL_WRITING错误,第{}次重试, sessionId={}", attempts, session.getId());
|
|
|
try {
|
|
|
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 100));
|
|
|
} catch (InterruptedException ie) {
|
|
|
@@ -618,11 +798,15 @@ public class WebSocketServer {
|
|
|
break;
|
|
|
}
|
|
|
} else {
|
|
|
- throw e;
|
|
|
+ log.error("发送消息失败: sessionId={}, error={}", session.getId(), e.getMessage(), e);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- log.info("超过重试次数, 消息 {}",message);
|
|
|
+
|
|
|
+ if (attempts >= maxRetries) {
|
|
|
+ log.warn("超过重试次数({}),放弃发送消息: sessionId={}", maxRetries, session.getId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -728,4 +912,6 @@ public class WebSocketServer {
|
|
|
String key = "live:auto_task:";
|
|
|
redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
+
|