|
|
@@ -36,12 +36,15 @@ import javax.websocket.*;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
@@ -68,6 +71,22 @@ public class WebSocketServer {
|
|
|
private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
|
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 消息队列系统
|
|
|
+ // 每个直播间的消息队列,使用优先级队列支持管理员消息插队
|
|
|
+ private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间的消费者线程
|
|
|
+ private final static ConcurrentHashMap<Long, Thread> consumerThreads = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间的消费者线程控制标志
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicBoolean> consumerRunningFlags = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间队列的总大小(字节数)
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicLong> queueSizes = new ConcurrentHashMap<>();
|
|
|
+ // 消息队列最大容量:10000
|
|
|
+ private final static int MAX_QUEUE_SIZE = 10000;
|
|
|
+ // 消息队列最大大小:200MB
|
|
|
+ private final static long MAX_QUEUE_SIZE_BYTES = 200L * 1024L * 1024L; // 200MB
|
|
|
+ // 上下线消息采样率:10%
|
|
|
+ private final static double ENTRY_EXIT_SAMPLE_RATE = 0.1;
|
|
|
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
@@ -110,7 +129,7 @@ public class WebSocketServer {
|
|
|
if (live == null) {
|
|
|
throw new BaseException("未找到直播间");
|
|
|
}
|
|
|
- long companyId = live.getCompanyId() == null ? -1L : live.getCompanyId();
|
|
|
+ long companyId = -1L;
|
|
|
long companyUserId = -1L;
|
|
|
if (!Objects.isNull(userProperties.get("companyId"))) {
|
|
|
companyId = (long) userProperties.get("companyId");
|
|
|
@@ -131,7 +150,15 @@ public class WebSocketServer {
|
|
|
|
|
|
// 记录连接信息 管理员不记录
|
|
|
if (userType == 0) {
|
|
|
- FsUserScrm fsUser = fsUserService.selectFsUserById(userId);
|
|
|
+ // 缓存用户信息,过期时间4小时
|
|
|
+ String userCacheKey = "fs:user:" + userId;
|
|
|
+ FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
|
+ if (fsUser == null) {
|
|
|
+ fsUser = fsUserService.selectFsUserById(userId);
|
|
|
+ if (fsUser != null) {
|
|
|
+ redisCache.setCacheObject(userCacheKey, fsUser, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
@@ -181,7 +208,8 @@ public class WebSocketServer {
|
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
}
|
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- if (1 == random.nextInt(10)) {
|
|
|
+ // 上线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -191,11 +219,19 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 广播连接消息
|
|
|
- broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 将上线消息加入队列
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
|
|
|
|
- LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
+ // 缓存用户首次进入记录,过期时间4小时
|
|
|
+ String liveUserFirstEntryCacheKey = "live:userFirstEntry:" + liveId + ":" + userId;
|
|
|
+ LiveUserFirstEntry liveUserFirstEntry = redisCache.getCacheObject(liveUserFirstEntryCacheKey);
|
|
|
+ if (liveUserFirstEntry == null) {
|
|
|
+ liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
+ if (liveUserFirstEntry != null) {
|
|
|
+ redisCache.setCacheObject(liveUserFirstEntryCacheKey, liveUserFirstEntry, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
// 如果用户连上了 socket,并且公司ID和销售ID大于0,更新 LiveWatchLog 的 logType
|
|
|
|
|
|
if ((qwUserId > 0 && externalContactId > 0) || (liveUserFirstEntry != null && liveUserFirstEntry.getCompanyId() > 0 && liveUserFirstEntry.getCompanyUserId() > 0 )) {
|
|
|
@@ -226,9 +262,19 @@ public class WebSocketServer {
|
|
|
} else {
|
|
|
// 这个用户A邀请用户b,b的业绩算a的销售的
|
|
|
if (companyId == -2L) {
|
|
|
- LiveUserFirstEntry clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
|
|
|
- companyId = clientB.getCompanyId();
|
|
|
- companyUserId = clientB.getCompanyUserId();
|
|
|
+ // 缓存用户首次进入记录,过期时间4小时
|
|
|
+ String clientBCacheKey = "live:userFirstEntry:" + liveId + ":" + companyUserId;
|
|
|
+ LiveUserFirstEntry clientB = redisCache.getCacheObject(clientBCacheKey);
|
|
|
+ if (clientB == null) {
|
|
|
+ clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
|
|
|
+ if (clientB != null) {
|
|
|
+ redisCache.setCacheObject(clientBCacheKey, clientB, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (clientB != null) {
|
|
|
+ companyId = clientB.getCompanyId();
|
|
|
+ companyUserId = clientB.getCompanyUserId();
|
|
|
+ }
|
|
|
}
|
|
|
Date date = new Date();
|
|
|
liveUserFirstEntry = new LiveUserFirstEntry();
|
|
|
@@ -247,10 +293,10 @@ public class WebSocketServer {
|
|
|
}
|
|
|
liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
}
|
|
|
- redisCache.setCacheObject( "live:user:first:entry:" + liveId + ":" + userId, liveUserFirstEntry,1, TimeUnit.HOURS);
|
|
|
+ redisCache.setCacheObject( "live:user:first:entry:" + liveId + ":" + userId, liveUserFirstEntry, 4, TimeUnit.HOURS);
|
|
|
|
|
|
// 推送完课积分倒计时配置信息给前端
|
|
|
- sendCompletionPointsConfigToUser(session, liveId, userId, live);
|
|
|
+// sendCompletionPointsConfigToUser(session, liveId, userId, live);
|
|
|
|
|
|
|
|
|
} else {
|
|
|
@@ -263,6 +309,15 @@ public class WebSocketServer {
|
|
|
sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
// 初始化心跳时间
|
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 如果有session,启动消费者线程
|
|
|
+ ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
|
|
|
+ List<Session> tempAdminRoom = getAdminRoom(liveId);
|
|
|
+ boolean hasSession = (tempRoom != null && !tempRoom.isEmpty()) ||
|
|
|
+ (tempAdminRoom != null && !tempAdminRoom.isEmpty());
|
|
|
+ if (hasSession) {
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|
|
|
@@ -287,7 +342,15 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
if (userType == 0) {
|
|
|
- FsUserScrm fsUser = fsUserService.selectFsUserById(userId);
|
|
|
+ // 缓存用户信息,过期时间4小时
|
|
|
+ String userCacheKey = "fs:user:" + userId;
|
|
|
+ FsUserScrm fsUser = redisCache.getCacheObject(userCacheKey);
|
|
|
+ if (fsUser == null) {
|
|
|
+ fsUser = fsUserService.selectFsUserById(userId);
|
|
|
+ if (fsUser != null) {
|
|
|
+ redisCache.setCacheObject(userCacheKey, fsUser, 4, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
@@ -307,8 +370,8 @@ public class WebSocketServer {
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
|
|
|
|
|
|
- // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
- if (1 == new Random().nextInt(10)) {
|
|
|
+ // 下线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -318,7 +381,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 将下线消息加入队列
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
@@ -336,6 +400,9 @@ public class WebSocketServer {
|
|
|
// 清理Session相关资源
|
|
|
heartbeatCache.remove(session.getId());
|
|
|
sessionLocks.remove(session.getId());
|
|
|
+
|
|
|
+ // 检查并清理空的直播间资源
|
|
|
+ cleanupEmptyRoom(liveId);
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@@ -345,6 +412,7 @@ public class WebSocketServer {
|
|
|
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userType = (long) userProperties.get("userType");
|
|
|
+ boolean isAdmin = false;
|
|
|
|
|
|
SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
if(msg.isOn()) return;
|
|
|
@@ -360,55 +428,55 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
|
|
|
|
- if (msg.getData() != null && !msg.getData().isEmpty()) {
|
|
|
- try {
|
|
|
- Long currentDuration = Long.parseLong(msg.getData());
|
|
|
-
|
|
|
- Live currentLive = liveService.selectLiveByLiveId(liveId);
|
|
|
- if (currentLive == null) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- // 判断直播是否已开始:status=2(直播中) 或 当前时间 >= 开播时间
|
|
|
- boolean isLiveStarted = false;
|
|
|
- if (currentLive.getStatus() != null && currentLive.getStatus() == 2) {
|
|
|
- // status=2 表示直播中
|
|
|
- isLiveStarted = true;
|
|
|
- } else if (currentLive.getStartTime() != null) {
|
|
|
- // 判断当前时间是否已超过开播时间
|
|
|
- LocalDateTime now = LocalDateTime.now();
|
|
|
- isLiveStarted = now.isAfter(currentLive.getStartTime()) || now.isEqual(currentLive.getStartTime());
|
|
|
- }
|
|
|
-
|
|
|
- // 使用Hash结构存储:一个直播间一个Hash,包含所有用户的时长
|
|
|
- String hashKey = "live:watch:duration:hash:" + liveId;
|
|
|
- String userIdField = String.valueOf(watchUserId);
|
|
|
-
|
|
|
- if (!isLiveStarted) {
|
|
|
- redisCache.hashDelete(hashKey, userIdField);
|
|
|
- log.debug("[心跳-观看时长] 直播未开始,清除预播时长, liveId={}, userId={}", liveId, watchUserId);
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- // 直播已开始,记录观看时长
|
|
|
- // 获取现有时长
|
|
|
- Object existingDuration = redisCache.hashGet(hashKey, userIdField);
|
|
|
- // 只有当新的时长更大时才更新
|
|
|
- if (existingDuration == null || currentDuration > Long.parseLong(existingDuration.toString())) {
|
|
|
- // 更新Hash中的用户时长
|
|
|
- redisCache.hashPut(hashKey, userIdField, currentDuration.toString());
|
|
|
- // 设置过期时间(2小时)
|
|
|
- redisCache.expire(hashKey, 2, TimeUnit.HOURS);
|
|
|
-
|
|
|
- checkAndSendCompletionPointsInRealTime(liveId, watchUserId, currentDuration);
|
|
|
-
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[心跳-观看时长] 更新失败, liveId={}, userId={}, data={}",
|
|
|
- liveId, watchUserId, msg.getData(), e);
|
|
|
- }
|
|
|
- }
|
|
|
+// if (msg.getData() != null && !msg.getData().isEmpty()) {
|
|
|
+// try {
|
|
|
+// Long currentDuration = Long.parseLong(msg.getData());
|
|
|
+//
|
|
|
+// Live currentLive = liveService.selectLiveByLiveId(liveId);
|
|
|
+// if (currentLive == null) {
|
|
|
+// break;
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// // 判断直播是否已开始:status=2(直播中) 或 当前时间 >= 开播时间
|
|
|
+// boolean isLiveStarted = false;
|
|
|
+// if (currentLive.getStatus() != null && currentLive.getStatus() == 2) {
|
|
|
+// // status=2 表示直播中
|
|
|
+// isLiveStarted = true;
|
|
|
+// } else if (currentLive.getStartTime() != null) {
|
|
|
+// // 判断当前时间是否已超过开播时间
|
|
|
+// LocalDateTime now = LocalDateTime.now();
|
|
|
+// isLiveStarted = now.isAfter(currentLive.getStartTime()) || now.isEqual(currentLive.getStartTime());
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 使用Hash结构存储:一个直播间一个Hash,包含所有用户的时长
|
|
|
+// String hashKey = "live:watch:duration:hash:" + liveId;
|
|
|
+// String userIdField = String.valueOf(watchUserId);
|
|
|
+//
|
|
|
+// if (!isLiveStarted) {
|
|
|
+// redisCache.hashDelete(hashKey, userIdField);
|
|
|
+// log.debug("[心跳-观看时长] 直播未开始,清除预播时长, liveId={}, userId={}", liveId, watchUserId);
|
|
|
+// break;
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 直播已开始,记录观看时长
|
|
|
+// // 获取现有时长
|
|
|
+// Object existingDuration = redisCache.hashGet(hashKey, userIdField);
|
|
|
+// // 只有当新的时长更大时才更新
|
|
|
+// if (existingDuration == null || currentDuration > Long.parseLong(existingDuration.toString())) {
|
|
|
+// // 更新Hash中的用户时长
|
|
|
+// redisCache.hashPut(hashKey, userIdField, currentDuration.toString());
|
|
|
+// // 设置过期时间(2小时)
|
|
|
+// redisCache.expire(hashKey, 2, TimeUnit.HOURS);
|
|
|
+//
|
|
|
+// checkAndSendCompletionPointsInRealTime(liveId, watchUserId, currentDuration);
|
|
|
+//
|
|
|
+// }
|
|
|
+// } catch (Exception e) {
|
|
|
+// log.error("[心跳-观看时长] 更新失败, liveId={}, userId={}, data={}",
|
|
|
+// liveId, watchUserId, msg.getData(), e);
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
|
@@ -442,8 +510,9 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 将消息加入队列(普通用户消息)
|
|
|
+ isAdmin = (userType == 1);
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), isAdmin);
|
|
|
break;
|
|
|
case "sendNormalMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -476,8 +545,7 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
msg.setCmd("sendMsg");
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendPopMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -490,8 +558,7 @@ public class WebSocketServer {
|
|
|
liveMsg.setMsg(msg.getMsg());
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendTopMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -505,8 +572,7 @@ public class WebSocketServer {
|
|
|
liveMsg.setEndTime(DateUtils.addMinutes(new Date(),msg.getDuration()).toString());
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
// 放在当前活动里面
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
|
|
|
redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
|
|
|
@@ -516,13 +582,13 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
liveWatchUserService.updateGlobalVisible(liveId, msg.getStatus());
|
|
|
liveService.updateGlobalVisible(liveId, msg.getStatus());
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "singleVisible":
|
|
|
liveWatchUserService.updateSingleVisible(liveId, msg.getStatus(),msg.getUserId());
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendGift":
|
|
|
break;
|
|
|
@@ -561,7 +627,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setUserType(0L);
|
|
|
sendMsgVo.setCmd("deleteMsg");
|
|
|
sendMsgVo.setMsg(msg.getMsg());
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), true);
|
|
|
}
|
|
|
|
|
|
private void processCoupon(long liveId, SendMsgVo msg) {
|
|
|
@@ -581,7 +648,8 @@ public class WebSocketServer {
|
|
|
} else {
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
|
|
|
}
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -596,7 +664,8 @@ public class WebSocketServer {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setLiveId(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveGoods));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -610,7 +679,8 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveRedConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveRedConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -625,7 +695,8 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveLotteryConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveLotteryConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1105,7 +1176,8 @@ public class WebSocketServer {
|
|
|
// }
|
|
|
}
|
|
|
msg.setStatus(1);
|
|
|
- broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 定时任务消息作为管理员消息插队
|
|
|
+ enqueueMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
} catch (Exception e) {
|
|
|
log.error("定时任务执行异常:{}", e.getMessage());
|
|
|
}
|
|
|
@@ -1491,5 +1563,294 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 消息队列包装类,支持优先级(管理员消息优先级更高)
|
|
|
+ */
|
|
|
+ private static class QueueMessage implements Comparable<QueueMessage> {
|
|
|
+ private final String message;
|
|
|
+ private final long timestamp;
|
|
|
+ private final int priority; // 0=普通消息, 1=管理员消息(优先级更高)
|
|
|
+ private final long sequence; // 序列号,用于相同优先级消息的FIFO排序
|
|
|
+ private final long sizeBytes; // 消息大小(字节数)
|
|
|
+
|
|
|
+ private static final AtomicLong sequenceGenerator = new AtomicLong(0);
|
|
|
+
|
|
|
+ public QueueMessage(String message, boolean isAdmin) {
|
|
|
+ this.message = message;
|
|
|
+ this.timestamp = System.currentTimeMillis();
|
|
|
+ this.priority = isAdmin ? 1 : 0;
|
|
|
+ this.sequence = sequenceGenerator.getAndIncrement();
|
|
|
+ // 计算消息大小(UTF-8编码)
|
|
|
+ this.sizeBytes = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getMessage() {
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getSizeBytes() {
|
|
|
+ return sizeBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compareTo(QueueMessage other) {
|
|
|
+ // 优先级高的先处理(管理员消息)
|
|
|
+ int priorityCompare = Integer.compare(other.priority, this.priority);
|
|
|
+ if (priorityCompare != 0) {
|
|
|
+ return priorityCompare;
|
|
|
+ }
|
|
|
+ // 相同优先级按序列号排序(FIFO)
|
|
|
+ return Long.compare(this.sequence, other.sequence);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取或创建消息队列
|
|
|
+ */
|
|
|
+ private PriorityBlockingQueue<QueueMessage> getMessageQueue(Long liveId) {
|
|
|
+ return messageQueues.computeIfAbsent(liveId, k -> new PriorityBlockingQueue<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动消费者线程(如果还没有启动)
|
|
|
+ */
|
|
|
+ private void startConsumerThread(Long liveId) {
|
|
|
+ consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
+
|
|
|
+ // 如果线程已经在运行,直接返回
|
|
|
+ if (runningFlag.get()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 尝试启动消费者线程
|
|
|
+ synchronized (consumerRunningFlags) {
|
|
|
+ if (runningFlag.compareAndSet(false, true)) {
|
|
|
+ Thread consumerThread = new Thread(() -> {
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
+ log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
|
|
|
+
|
|
|
+ while (runningFlag.get()) {
|
|
|
+ try {
|
|
|
+ // 检查是否还有session,如果没有则退出
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (!hasSession) {
|
|
|
+ log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从队列中取消息,最多等待1秒
|
|
|
+ QueueMessage queueMessage = queue.poll(1, TimeUnit.SECONDS);
|
|
|
+ if (queueMessage != null) {
|
|
|
+ // 更新队列大小(减少)
|
|
|
+ AtomicLong currentSize = queueSizes.get(liveId);
|
|
|
+ if (currentSize != null) {
|
|
|
+ currentSize.addAndGet(-queueMessage.getSizeBytes());
|
|
|
+ }
|
|
|
+ // 广播消息
|
|
|
+ broadcastMessageFromQueue(liveId, queueMessage.getMessage());
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.info("[消息队列] 消费者线程被中断, liveId={}", liveId);
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 清理资源
|
|
|
+ runningFlag.set(false);
|
|
|
+ consumerThreads.remove(liveId);
|
|
|
+ log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
|
|
|
+ }, "MessageConsumer-" + liveId);
|
|
|
+
|
|
|
+ consumerThread.setDaemon(true);
|
|
|
+ consumerThread.start();
|
|
|
+ consumerThreads.put(liveId, consumerThread);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 停止消费者线程
|
|
|
+ */
|
|
|
+ private void stopConsumerThread(Long liveId) {
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
+ if (runningFlag != null) {
|
|
|
+ runningFlag.set(false);
|
|
|
+ }
|
|
|
+ Thread consumerThread = consumerThreads.remove(liveId);
|
|
|
+ if (consumerThread != null) {
|
|
|
+ consumerThread.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将消息加入队列
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param message 消息内容
|
|
|
+ * @param isAdmin 是否是管理员消息(管理员消息会插队)
|
|
|
+ * @return 是否成功加入队列
|
|
|
+ */
|
|
|
+ private boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
+ AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
|
|
|
+
|
|
|
+ // 计算新消息的大小
|
|
|
+ long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
+
|
|
|
+ // 检查队列条数限制
|
|
|
+ if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查队列大小限制(200MB)
|
|
|
+ long newTotalSize = currentSize.get() + messageSize;
|
|
|
+ if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
|
|
|
+ if (!isAdmin) {
|
|
|
+ // 普通消息超过大小限制,直接丢弃
|
|
|
+ log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB",
|
|
|
+ liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ // 管理员消息:需要移除一些普通消息以腾出空间
|
|
|
+ long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
|
|
|
+ long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
|
|
|
+ if (freedSize < needToFree) {
|
|
|
+ log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB",
|
|
|
+ liveId, needToFree / 1024.0, freedSize / 1024.0);
|
|
|
+ // 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果是管理员消息且队列条数已满,移除一个普通消息
|
|
|
+ if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ // 由于是优先级队列,普通消息(priority=0)会在队列末尾
|
|
|
+ // 尝试移除一个普通消息,为管理员消息腾出空间
|
|
|
+ QueueMessage removed = null;
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext()) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (msg.priority == 0) {
|
|
|
+ removed = msg;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (removed != null) {
|
|
|
+ queue.remove(removed);
|
|
|
+ currentSize.addAndGet(-removed.getSizeBytes());
|
|
|
+ log.debug("[消息队列] 管理员消息插队,移除普通消息, liveId={}", liveId);
|
|
|
+ } else {
|
|
|
+ // 如果没有普通消息,移除队列末尾的消息(可能是最早的管理员消息)
|
|
|
+ // 这种情况很少发生,因为管理员消息通常较少
|
|
|
+ log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ QueueMessage queueMessage = new QueueMessage(message, isAdmin);
|
|
|
+ queue.offer(queueMessage);
|
|
|
+ currentSize.addAndGet(messageSize);
|
|
|
+
|
|
|
+ // 如果有session,确保消费者线程在运行
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (hasSession) {
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 移除消息以释放空间
|
|
|
+ * @param queue 消息队列
|
|
|
+ * @param currentSize 当前队列大小(原子变量)
|
|
|
+ * @param needToFree 需要释放的空间(字节数)
|
|
|
+ * @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
|
|
|
+ * @return 实际释放的空间(字节数)
|
|
|
+ */
|
|
|
+ private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue,
|
|
|
+ AtomicLong currentSize,
|
|
|
+ long needToFree,
|
|
|
+ boolean onlyRemoveNormal) {
|
|
|
+ long freedSize = 0;
|
|
|
+ List<QueueMessage> toRemove = new ArrayList<>();
|
|
|
+
|
|
|
+ // 收集需要移除的消息(优先移除普通消息)
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (!onlyRemoveNormal || msg.priority == 0) {
|
|
|
+ toRemove.add(msg);
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果只移除普通消息但空间还不够,可以移除管理员消息
|
|
|
+ if (onlyRemoveNormal && freedSize < needToFree) {
|
|
|
+ iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (msg.priority == 1 && !toRemove.contains(msg)) {
|
|
|
+ toRemove.add(msg);
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除消息并更新大小
|
|
|
+ for (QueueMessage msg : toRemove) {
|
|
|
+ if (queue.remove(msg)) {
|
|
|
+ currentSize.addAndGet(-msg.getSizeBytes());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (freedSize > 0) {
|
|
|
+ log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB",
|
|
|
+ toRemove.size(), freedSize / 1024.0);
|
|
|
+ }
|
|
|
+
|
|
|
+ return freedSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从队列中消费消息并广播
|
|
|
+ */
|
|
|
+ private void broadcastMessageFromQueue(Long liveId, String message) {
|
|
|
+ broadcastMessage(liveId, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查并清理空的直播间资源
|
|
|
+ */
|
|
|
+ private void cleanupEmptyRoom(Long liveId) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (!hasSession) {
|
|
|
+ // 停止消费者线程
|
|
|
+ stopConsumerThread(liveId);
|
|
|
+ // 清理消息队列
|
|
|
+ messageQueues.remove(liveId);
|
|
|
+ consumerRunningFlags.remove(liveId);
|
|
|
+ queueSizes.remove(liveId);
|
|
|
+ log.info("[消息队列] 清理空直播间资源, liveId={}", liveId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|