|
|
@@ -36,12 +36,15 @@ import javax.websocket.*;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
@@ -68,6 +71,22 @@ public class WebSocketServer {
|
|
|
private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
|
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 消息队列系统
|
|
|
+ // 每个直播间的消息队列,使用优先级队列支持管理员消息插队
|
|
|
+ private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间的消费者线程
|
|
|
+ private final static ConcurrentHashMap<Long, Thread> consumerThreads = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间的消费者线程控制标志
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicBoolean> consumerRunningFlags = new ConcurrentHashMap<>();
|
|
|
+ // 每个直播间队列的总大小(字节数)
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicLong> queueSizes = new ConcurrentHashMap<>();
|
|
|
+ // 消息队列最大容量:10000
|
|
|
+ private final static int MAX_QUEUE_SIZE = 10000;
|
|
|
+ // 消息队列最大大小:200MB
|
|
|
+ private final static long MAX_QUEUE_SIZE_BYTES = 200L * 1024L * 1024L; // 200MB
|
|
|
+ // 上下线消息采样率:10%
|
|
|
+ private final static double ENTRY_EXIT_SAMPLE_RATE = 0.1;
|
|
|
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
@@ -189,7 +208,8 @@ public class WebSocketServer {
|
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
}
|
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- if (1 == random.nextInt(10)) {
|
|
|
+ // 上线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -199,8 +219,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 广播连接消息
|
|
|
- broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 将上线消息加入队列
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
|
|
|
|
// 缓存用户首次进入记录,过期时间4小时
|
|
|
@@ -289,6 +309,15 @@ public class WebSocketServer {
|
|
|
sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
// 初始化心跳时间
|
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 如果有session,启动消费者线程
|
|
|
+ ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
|
|
|
+ List<Session> tempAdminRoom = getAdminRoom(liveId);
|
|
|
+ boolean hasSession = (tempRoom != null && !tempRoom.isEmpty()) ||
|
|
|
+ (tempAdminRoom != null && !tempAdminRoom.isEmpty());
|
|
|
+ if (hasSession) {
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|
|
|
@@ -341,8 +370,8 @@ public class WebSocketServer {
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
|
|
|
|
|
|
- // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
- if (1 == new Random().nextInt(10)) {
|
|
|
+ // 下线消息采样10%进入队列
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -352,7 +381,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 将下线消息加入队列
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
@@ -370,6 +400,9 @@ public class WebSocketServer {
|
|
|
// 清理Session相关资源
|
|
|
heartbeatCache.remove(session.getId());
|
|
|
sessionLocks.remove(session.getId());
|
|
|
+
|
|
|
+ // 检查并清理空的直播间资源
|
|
|
+ cleanupEmptyRoom(liveId);
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@@ -379,6 +412,7 @@ public class WebSocketServer {
|
|
|
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userType = (long) userProperties.get("userType");
|
|
|
+ boolean isAdmin = false;
|
|
|
|
|
|
SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
if(msg.isOn()) return;
|
|
|
@@ -476,8 +510,9 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 将消息加入队列(普通用户消息)
|
|
|
+ isAdmin = (userType == 1);
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), isAdmin);
|
|
|
break;
|
|
|
case "sendNormalMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -510,8 +545,7 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
msg.setCmd("sendMsg");
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendPopMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -524,8 +558,7 @@ public class WebSocketServer {
|
|
|
liveMsg.setMsg(msg.getMsg());
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendTopMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
@@ -539,8 +572,7 @@ public class WebSocketServer {
|
|
|
liveMsg.setEndTime(DateUtils.addMinutes(new Date(),msg.getDuration()).toString());
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
// 放在当前活动里面
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
|
|
|
redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
|
|
|
@@ -550,13 +582,13 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
|
liveWatchUserService.updateGlobalVisible(liveId, msg.getStatus());
|
|
|
liveService.updateGlobalVisible(liveId, msg.getStatus());
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "singleVisible":
|
|
|
liveWatchUserService.updateSingleVisible(liveId, msg.getStatus(),msg.getUserId());
|
|
|
- // 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
|
case "sendGift":
|
|
|
break;
|
|
|
@@ -595,7 +627,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setUserType(0L);
|
|
|
sendMsgVo.setCmd("deleteMsg");
|
|
|
sendMsgVo.setMsg(msg.getMsg());
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), true);
|
|
|
}
|
|
|
|
|
|
private void processCoupon(long liveId, SendMsgVo msg) {
|
|
|
@@ -615,7 +648,8 @@ public class WebSocketServer {
|
|
|
} else {
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
|
|
|
}
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -630,7 +664,8 @@ public class WebSocketServer {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setLiveId(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveGoods));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -644,7 +679,8 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveRedConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveRedConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -659,7 +695,8 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveLotteryConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveLotteryConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 管理员消息插队
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1139,7 +1176,8 @@ public class WebSocketServer {
|
|
|
// }
|
|
|
}
|
|
|
msg.setStatus(1);
|
|
|
- broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 定时任务消息作为管理员消息插队
|
|
|
+ enqueueMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
} catch (Exception e) {
|
|
|
log.error("定时任务执行异常:{}", e.getMessage());
|
|
|
}
|
|
|
@@ -1525,5 +1563,294 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 消息队列包装类,支持优先级(管理员消息优先级更高)
|
|
|
+ */
|
|
|
+ private static class QueueMessage implements Comparable<QueueMessage> {
|
|
|
+ private final String message;
|
|
|
+ private final long timestamp;
|
|
|
+ private final int priority; // 0=普通消息, 1=管理员消息(优先级更高)
|
|
|
+ private final long sequence; // 序列号,用于相同优先级消息的FIFO排序
|
|
|
+ private final long sizeBytes; // 消息大小(字节数)
|
|
|
+
|
|
|
+ private static final AtomicLong sequenceGenerator = new AtomicLong(0);
|
|
|
+
|
|
|
+ public QueueMessage(String message, boolean isAdmin) {
|
|
|
+ this.message = message;
|
|
|
+ this.timestamp = System.currentTimeMillis();
|
|
|
+ this.priority = isAdmin ? 1 : 0;
|
|
|
+ this.sequence = sequenceGenerator.getAndIncrement();
|
|
|
+ // 计算消息大小(UTF-8编码)
|
|
|
+ this.sizeBytes = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getMessage() {
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getSizeBytes() {
|
|
|
+ return sizeBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compareTo(QueueMessage other) {
|
|
|
+ // 优先级高的先处理(管理员消息)
|
|
|
+ int priorityCompare = Integer.compare(other.priority, this.priority);
|
|
|
+ if (priorityCompare != 0) {
|
|
|
+ return priorityCompare;
|
|
|
+ }
|
|
|
+ // 相同优先级按序列号排序(FIFO)
|
|
|
+ return Long.compare(this.sequence, other.sequence);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取或创建消息队列
|
|
|
+ */
|
|
|
+ private PriorityBlockingQueue<QueueMessage> getMessageQueue(Long liveId) {
|
|
|
+ return messageQueues.computeIfAbsent(liveId, k -> new PriorityBlockingQueue<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动消费者线程(如果还没有启动)
|
|
|
+ */
|
|
|
+ private void startConsumerThread(Long liveId) {
|
|
|
+ consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
+
|
|
|
+ // 如果线程已经在运行,直接返回
|
|
|
+ if (runningFlag.get()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 尝试启动消费者线程
|
|
|
+ synchronized (consumerRunningFlags) {
|
|
|
+ if (runningFlag.compareAndSet(false, true)) {
|
|
|
+ Thread consumerThread = new Thread(() -> {
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
+ log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
|
|
|
+
|
|
|
+ while (runningFlag.get()) {
|
|
|
+ try {
|
|
|
+ // 检查是否还有session,如果没有则退出
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (!hasSession) {
|
|
|
+ log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从队列中取消息,最多等待1秒
|
|
|
+ QueueMessage queueMessage = queue.poll(1, TimeUnit.SECONDS);
|
|
|
+ if (queueMessage != null) {
|
|
|
+ // 更新队列大小(减少)
|
|
|
+ AtomicLong currentSize = queueSizes.get(liveId);
|
|
|
+ if (currentSize != null) {
|
|
|
+ currentSize.addAndGet(-queueMessage.getSizeBytes());
|
|
|
+ }
|
|
|
+ // 广播消息
|
|
|
+ broadcastMessageFromQueue(liveId, queueMessage.getMessage());
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.info("[消息队列] 消费者线程被中断, liveId={}", liveId);
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 清理资源
|
|
|
+ runningFlag.set(false);
|
|
|
+ consumerThreads.remove(liveId);
|
|
|
+ log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
|
|
|
+ }, "MessageConsumer-" + liveId);
|
|
|
+
|
|
|
+ consumerThread.setDaemon(true);
|
|
|
+ consumerThread.start();
|
|
|
+ consumerThreads.put(liveId, consumerThread);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 停止消费者线程
|
|
|
+ */
|
|
|
+ private void stopConsumerThread(Long liveId) {
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
+ if (runningFlag != null) {
|
|
|
+ runningFlag.set(false);
|
|
|
+ }
|
|
|
+ Thread consumerThread = consumerThreads.remove(liveId);
|
|
|
+ if (consumerThread != null) {
|
|
|
+ consumerThread.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将消息加入队列
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param message 消息内容
|
|
|
+ * @param isAdmin 是否是管理员消息(管理员消息会插队)
|
|
|
+ * @return 是否成功加入队列
|
|
|
+ */
|
|
|
+ private boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
+ AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
|
|
|
+
|
|
|
+ // 计算新消息的大小
|
|
|
+ long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
+
|
|
|
+ // 检查队列条数限制
|
|
|
+ if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查队列大小限制(200MB)
|
|
|
+ long newTotalSize = currentSize.get() + messageSize;
|
|
|
+ if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
|
|
|
+ if (!isAdmin) {
|
|
|
+ // 普通消息超过大小限制,直接丢弃
|
|
|
+ log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB",
|
|
|
+ liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ // 管理员消息:需要移除一些普通消息以腾出空间
|
|
|
+ long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
|
|
|
+ long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
|
|
|
+ if (freedSize < needToFree) {
|
|
|
+ log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB",
|
|
|
+ liveId, needToFree / 1024.0, freedSize / 1024.0);
|
|
|
+ // 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果是管理员消息且队列条数已满,移除一个普通消息
|
|
|
+ if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
+ // 由于是优先级队列,普通消息(priority=0)会在队列末尾
|
|
|
+ // 尝试移除一个普通消息,为管理员消息腾出空间
|
|
|
+ QueueMessage removed = null;
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext()) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (msg.priority == 0) {
|
|
|
+ removed = msg;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (removed != null) {
|
|
|
+ queue.remove(removed);
|
|
|
+ currentSize.addAndGet(-removed.getSizeBytes());
|
|
|
+ log.debug("[消息队列] 管理员消息插队,移除普通消息, liveId={}", liveId);
|
|
|
+ } else {
|
|
|
+ // 如果没有普通消息,移除队列末尾的消息(可能是最早的管理员消息)
|
|
|
+ // 这种情况很少发生,因为管理员消息通常较少
|
|
|
+ log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ QueueMessage queueMessage = new QueueMessage(message, isAdmin);
|
|
|
+ queue.offer(queueMessage);
|
|
|
+ currentSize.addAndGet(messageSize);
|
|
|
+
|
|
|
+ // 如果有session,确保消费者线程在运行
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (hasSession) {
|
|
|
+ startConsumerThread(liveId);
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 移除消息以释放空间
|
|
|
+ * @param queue 消息队列
|
|
|
+ * @param currentSize 当前队列大小(原子变量)
|
|
|
+ * @param needToFree 需要释放的空间(字节数)
|
|
|
+ * @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
|
|
|
+ * @return 实际释放的空间(字节数)
|
|
|
+ */
|
|
|
+ private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue,
|
|
|
+ AtomicLong currentSize,
|
|
|
+ long needToFree,
|
|
|
+ boolean onlyRemoveNormal) {
|
|
|
+ long freedSize = 0;
|
|
|
+ List<QueueMessage> toRemove = new ArrayList<>();
|
|
|
+
|
|
|
+ // 收集需要移除的消息(优先移除普通消息)
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (!onlyRemoveNormal || msg.priority == 0) {
|
|
|
+ toRemove.add(msg);
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果只移除普通消息但空间还不够,可以移除管理员消息
|
|
|
+ if (onlyRemoveNormal && freedSize < needToFree) {
|
|
|
+ iterator = queue.iterator();
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
+ if (msg.priority == 1 && !toRemove.contains(msg)) {
|
|
|
+ toRemove.add(msg);
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除消息并更新大小
|
|
|
+ for (QueueMessage msg : toRemove) {
|
|
|
+ if (queue.remove(msg)) {
|
|
|
+ currentSize.addAndGet(-msg.getSizeBytes());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (freedSize > 0) {
|
|
|
+ log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB",
|
|
|
+ toRemove.size(), freedSize / 1024.0);
|
|
|
+ }
|
|
|
+
|
|
|
+ return freedSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从队列中消费消息并广播
|
|
|
+ */
|
|
|
+ private void broadcastMessageFromQueue(Long liveId, String message) {
|
|
|
+ broadcastMessage(liveId, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查并清理空的直播间资源
|
|
|
+ */
|
|
|
+ private void cleanupEmptyRoom(Long liveId) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
+
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
+
|
|
|
+ if (!hasSession) {
|
|
|
+ // 停止消费者线程
|
|
|
+ stopConsumerThread(liveId);
|
|
|
+ // 清理消息队列
|
|
|
+ messageQueues.remove(liveId);
|
|
|
+ consumerRunningFlags.remove(liveId);
|
|
|
+ queueSizes.remove(liveId);
|
|
|
+ log.info("[消息队列] 清理空直播间资源, liveId={}", liveId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|