|
@@ -25,6 +25,7 @@ import com.fs.common.utils.spring.SpringUtils;
|
|
|
import com.fs.live.domain.*;
|
|
import com.fs.live.domain.*;
|
|
|
import com.fs.live.service.*;
|
|
import com.fs.live.service.*;
|
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
|
|
|
+import com.fs.newAdv.service.ILeadService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -71,7 +72,7 @@ public class WebSocketServer {
|
|
|
private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
|
|
private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
|
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 消息队列系统
|
|
// 消息队列系统
|
|
|
// 每个直播间的消息队列,使用优先级队列支持管理员消息插队
|
|
// 每个直播间的消息队列,使用优先级队列支持管理员消息插队
|
|
|
private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
|
|
@@ -104,6 +105,7 @@ public class WebSocketServer {
|
|
|
private final ILiveWatchLogService liveWatchLogService = SpringUtils.getBean(ILiveWatchLogService.class);
|
|
private final ILiveWatchLogService liveWatchLogService = SpringUtils.getBean(ILiveWatchLogService.class);
|
|
|
private final ILiveVideoService liveVideoService = SpringUtils.getBean(ILiveVideoService.class);
|
|
private final ILiveVideoService liveVideoService = SpringUtils.getBean(ILiveVideoService.class);
|
|
|
private final ILiveCompletionPointsRecordService completionPointsRecordService = SpringUtils.getBean(ILiveCompletionPointsRecordService.class);
|
|
private final ILiveCompletionPointsRecordService completionPointsRecordService = SpringUtils.getBean(ILiveCompletionPointsRecordService.class);
|
|
|
|
|
+ private final ILeadService leadService = SpringUtils.getBean(ILeadService.class);
|
|
|
private static Random random = new Random();
|
|
private static Random random = new Random();
|
|
|
|
|
|
|
|
// Redis key 前缀:用户进入直播间时间
|
|
// Redis key 前缀:用户进入直播间时间
|
|
@@ -292,6 +294,8 @@ public class WebSocketServer {
|
|
|
liveUserFirstEntry.setExternalContactId(externalContactId);
|
|
liveUserFirstEntry.setExternalContactId(externalContactId);
|
|
|
}
|
|
}
|
|
|
liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
|
|
liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
|
|
+ // 第一次进入直播间 发送广告线索
|
|
|
|
|
+ leadService.enterLive(userId, liveId);
|
|
|
}
|
|
}
|
|
|
redisCache.setCacheObject( "live:user:first:entry:" + liveId + ":" + userId, liveUserFirstEntry, 4, TimeUnit.HOURS);
|
|
redisCache.setCacheObject( "live:user:first:entry:" + liveId + ":" + userId, liveUserFirstEntry, 4, TimeUnit.HOURS);
|
|
|
|
|
|
|
@@ -309,7 +313,7 @@ public class WebSocketServer {
|
|
|
sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
// 初始化心跳时间
|
|
// 初始化心跳时间
|
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 如果有session,启动消费者线程
|
|
// 如果有session,启动消费者线程
|
|
|
ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
|
|
|
List<Session> tempAdminRoom = getAdminRoom(liveId);
|
|
List<Session> tempAdminRoom = getAdminRoom(liveId);
|
|
@@ -400,7 +404,7 @@ public class WebSocketServer {
|
|
|
// 清理Session相关资源
|
|
// 清理Session相关资源
|
|
|
heartbeatCache.remove(session.getId());
|
|
heartbeatCache.remove(session.getId());
|
|
|
sessionLocks.remove(session.getId());
|
|
sessionLocks.remove(session.getId());
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 检查并清理空的直播间资源
|
|
// 检查并清理空的直播间资源
|
|
|
cleanupEmptyRoom(liveId);
|
|
cleanupEmptyRoom(liveId);
|
|
|
}
|
|
}
|
|
@@ -1621,7 +1625,7 @@ public class WebSocketServer {
|
|
|
private void startConsumerThread(Long liveId) {
|
|
private void startConsumerThread(Long liveId) {
|
|
|
consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
|
|
consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
|
|
|
AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 如果线程已经在运行,直接返回
|
|
// 如果线程已经在运行,直接返回
|
|
|
if (runningFlag.get()) {
|
|
if (runningFlag.get()) {
|
|
|
return;
|
|
return;
|
|
@@ -1633,16 +1637,16 @@ public class WebSocketServer {
|
|
|
Thread consumerThread = new Thread(() -> {
|
|
Thread consumerThread = new Thread(() -> {
|
|
|
PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
|
|
log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
while (runningFlag.get()) {
|
|
while (runningFlag.get()) {
|
|
|
try {
|
|
try {
|
|
|
// 检查是否还有session,如果没有则退出
|
|
// 检查是否还有session,如果没有则退出
|
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
-
|
|
|
|
|
- boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
|
|
+
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
if (!hasSession) {
|
|
if (!hasSession) {
|
|
|
log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
|
|
log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
|
|
|
break;
|
|
break;
|
|
@@ -1667,13 +1671,13 @@ public class WebSocketServer {
|
|
|
log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
|
|
log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 清理资源
|
|
// 清理资源
|
|
|
runningFlag.set(false);
|
|
runningFlag.set(false);
|
|
|
consumerThreads.remove(liveId);
|
|
consumerThreads.remove(liveId);
|
|
|
log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
|
|
log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
|
|
|
}, "MessageConsumer-" + liveId);
|
|
}, "MessageConsumer-" + liveId);
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
consumerThread.setDaemon(true);
|
|
consumerThread.setDaemon(true);
|
|
|
consumerThread.start();
|
|
consumerThread.start();
|
|
|
consumerThreads.put(liveId, consumerThread);
|
|
consumerThreads.put(liveId, consumerThread);
|
|
@@ -1705,22 +1709,22 @@ public class WebSocketServer {
|
|
|
private boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
|
|
private boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
|
|
|
PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
|
|
AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 计算新消息的大小
|
|
// 计算新消息的大小
|
|
|
long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 检查队列条数限制
|
|
// 检查队列条数限制
|
|
|
if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
|
|
log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 检查队列大小限制(200MB)
|
|
// 检查队列大小限制(200MB)
|
|
|
long newTotalSize = currentSize.get() + messageSize;
|
|
long newTotalSize = currentSize.get() + messageSize;
|
|
|
if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
|
|
if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
|
|
|
if (!isAdmin) {
|
|
if (!isAdmin) {
|
|
|
// 普通消息超过大小限制,直接丢弃
|
|
// 普通消息超过大小限制,直接丢弃
|
|
|
- log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB",
|
|
|
|
|
|
|
+ log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB",
|
|
|
liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
|
|
liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
|
|
|
return false;
|
|
return false;
|
|
|
} else {
|
|
} else {
|
|
@@ -1728,13 +1732,13 @@ public class WebSocketServer {
|
|
|
long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
|
|
long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
|
|
|
long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
|
|
long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
|
|
|
if (freedSize < needToFree) {
|
|
if (freedSize < needToFree) {
|
|
|
- log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB",
|
|
|
|
|
|
|
+ log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB",
|
|
|
liveId, needToFree / 1024.0, freedSize / 1024.0);
|
|
liveId, needToFree / 1024.0, freedSize / 1024.0);
|
|
|
// 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
|
|
// 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 如果是管理员消息且队列条数已满,移除一个普通消息
|
|
// 如果是管理员消息且队列条数已满,移除一个普通消息
|
|
|
if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
// 由于是优先级队列,普通消息(priority=0)会在队列末尾
|
|
// 由于是优先级队列,普通消息(priority=0)会在队列末尾
|
|
@@ -1758,21 +1762,21 @@ public class WebSocketServer {
|
|
|
log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
|
|
log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
QueueMessage queueMessage = new QueueMessage(message, isAdmin);
|
|
QueueMessage queueMessage = new QueueMessage(message, isAdmin);
|
|
|
queue.offer(queueMessage);
|
|
queue.offer(queueMessage);
|
|
|
currentSize.addAndGet(messageSize);
|
|
currentSize.addAndGet(messageSize);
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 如果有session,确保消费者线程在运行
|
|
// 如果有session,确保消费者线程在运行
|
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
- boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
if (hasSession) {
|
|
if (hasSession) {
|
|
|
startConsumerThread(liveId);
|
|
startConsumerThread(liveId);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1784,13 +1788,13 @@ public class WebSocketServer {
|
|
|
* @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
|
|
* @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
|
|
|
* @return 实际释放的空间(字节数)
|
|
* @return 实际释放的空间(字节数)
|
|
|
*/
|
|
*/
|
|
|
- private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue,
|
|
|
|
|
- AtomicLong currentSize,
|
|
|
|
|
- long needToFree,
|
|
|
|
|
|
|
+ private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue,
|
|
|
|
|
+ AtomicLong currentSize,
|
|
|
|
|
+ long needToFree,
|
|
|
boolean onlyRemoveNormal) {
|
|
boolean onlyRemoveNormal) {
|
|
|
long freedSize = 0;
|
|
long freedSize = 0;
|
|
|
List<QueueMessage> toRemove = new ArrayList<>();
|
|
List<QueueMessage> toRemove = new ArrayList<>();
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 收集需要移除的消息(优先移除普通消息)
|
|
// 收集需要移除的消息(优先移除普通消息)
|
|
|
Iterator<QueueMessage> iterator = queue.iterator();
|
|
Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
while (iterator.hasNext() && freedSize < needToFree) {
|
|
while (iterator.hasNext() && freedSize < needToFree) {
|
|
@@ -1800,7 +1804,7 @@ public class WebSocketServer {
|
|
|
freedSize += msg.getSizeBytes();
|
|
freedSize += msg.getSizeBytes();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 如果只移除普通消息但空间还不够,可以移除管理员消息
|
|
// 如果只移除普通消息但空间还不够,可以移除管理员消息
|
|
|
if (onlyRemoveNormal && freedSize < needToFree) {
|
|
if (onlyRemoveNormal && freedSize < needToFree) {
|
|
|
iterator = queue.iterator();
|
|
iterator = queue.iterator();
|
|
@@ -1812,19 +1816,19 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 移除消息并更新大小
|
|
// 移除消息并更新大小
|
|
|
for (QueueMessage msg : toRemove) {
|
|
for (QueueMessage msg : toRemove) {
|
|
|
if (queue.remove(msg)) {
|
|
if (queue.remove(msg)) {
|
|
|
currentSize.addAndGet(-msg.getSizeBytes());
|
|
currentSize.addAndGet(-msg.getSizeBytes());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
if (freedSize > 0) {
|
|
if (freedSize > 0) {
|
|
|
- log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB",
|
|
|
|
|
|
|
+ log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB",
|
|
|
toRemove.size(), freedSize / 1024.0);
|
|
toRemove.size(), freedSize / 1024.0);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return freedSize;
|
|
return freedSize;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1841,10 +1845,10 @@ public class WebSocketServer {
|
|
|
private void cleanupEmptyRoom(Long liveId) {
|
|
private void cleanupEmptyRoom(Long liveId) {
|
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
-
|
|
|
|
|
- boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
|
|
+
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
(adminRoom != null && !adminRoom.isEmpty());
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
if (!hasSession) {
|
|
if (!hasSession) {
|
|
|
// 停止消费者线程
|
|
// 停止消费者线程
|
|
|
stopConsumerThread(liveId);
|
|
stopConsumerThread(liveId);
|