|
@@ -18,6 +18,7 @@ import com.fs.live.service.*;
|
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
|
import com.fs.store.domain.FsUser;
|
|
import com.fs.store.domain.FsUser;
|
|
|
import com.fs.store.service.IFsUserService;
|
|
import com.fs.store.service.IFsUserService;
|
|
|
|
|
+import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
@@ -25,12 +26,15 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
import javax.websocket.*;
|
|
import javax.websocket.*;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
-import java.io.EOFException;
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
import java.util.concurrent.*;
|
|
|
-import java.util.concurrent.locks.Lock;
|
|
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
|
+import java.util.concurrent.locks.StampedLock;
|
|
|
|
|
|
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
import static com.fs.common.constant.LiveKeysConstant.TOP_MSG;
|
|
import static com.fs.common.constant.LiveKeysConstant.TOP_MSG;
|
|
@@ -47,20 +51,90 @@ public class WebSocketServer {
|
|
|
// 管理端连接
|
|
// 管理端连接
|
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
- // Session发送锁,避免同一会话并发发送消息
|
|
|
|
|
- private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
+ // Session发送锁,避免同一会话并发发送消息(使用StampedLock提升性能)
|
|
|
|
|
+ private final static ConcurrentHashMap<String, StampedLock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 最大消息大小(字节):超过此大小将分片发送,默认64KB
|
|
|
|
|
+ private final static int MAX_MESSAGE_SIZE = 64 * 1024; // 64KB
|
|
|
|
|
+ // 分片大小(字节):每片大小
|
|
|
|
|
+ private final static int CHUNK_SIZE = 32 * 1024; // 32KB
|
|
|
|
|
+ // Session发送队列监控:记录每个Session的待发送消息数
|
|
|
|
|
+ private final static ConcurrentHashMap<String, AtomicLong> sessionQueueSizes = new ConcurrentHashMap<>();
|
|
|
// 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
// 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
|
private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
|
|
|
// 心跳超时时间(毫秒):3分钟无心跳则认为超时
|
|
// 心跳超时时间(毫秒):3分钟无心跳则认为超时
|
|
|
private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
|
|
private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
|
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
// admin房间消息发送线程池(单线程,保证串行化)
|
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 心跳响应专用线程池(高并发优化)
|
|
|
|
|
+ private static final ExecutorService HEARTBEAT_EXECUTOR = new ThreadPoolExecutor(
|
|
|
|
|
+ 16, // 核心线程数:根据32核CPU,设置为16
|
|
|
|
|
+ 64, // 最大线程数:高并发场景
|
|
|
|
|
+ 60L, TimeUnit.SECONDS, // 空闲线程存活时间
|
|
|
|
|
+ new LinkedBlockingQueue<>(10000), // 队列容量:10000
|
|
|
|
|
+ new ThreadFactory() {
|
|
|
|
|
+ private final AtomicLong counter = new AtomicLong(0);
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
|
|
+ Thread t = new Thread(r, "websocket-heartbeat-" + counter.incrementAndGet());
|
|
|
|
|
+ t.setDaemon(true);
|
|
|
|
|
+ return t;
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者运行
|
|
|
|
|
+ );
|
|
|
|
|
+ // 1. 新增一个数据库异步操作线程池(在类中定义)
|
|
|
|
|
+ private static final ExecutorService DB_ASYNC_EXECUTOR = new ThreadPoolExecutor(
|
|
|
|
|
+ 16,
|
|
|
|
|
+ 64,
|
|
|
|
|
+ 60L, TimeUnit.SECONDS,
|
|
|
|
|
+ new LinkedBlockingQueue<>(10000),
|
|
|
|
|
+ new ThreadFactory() {
|
|
|
|
|
+ private final AtomicLong counter = new AtomicLong(0);
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
|
|
+ Thread t = new Thread(r, "websocket-db-async-" + counter.incrementAndGet());
|
|
|
|
|
+ t.setDaemon(true);
|
|
|
|
|
+ return t;
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 消息队列系统
|
|
|
|
|
+ // 每个直播间的消息队列,使用优先级队列支持管理员消息插队
|
|
|
|
|
+ private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 每个直播间的消费者线程
|
|
|
|
|
+ private final static ConcurrentHashMap<Long, Thread> consumerThreads = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 每个直播间的消费者线程控制标志
|
|
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicBoolean> consumerRunningFlags = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 每个直播间队列的总大小(字节数)
|
|
|
|
|
+ private final static ConcurrentHashMap<Long, AtomicLong> queueSizes = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 消息队列最大容量:10000
|
|
|
|
|
+ private final static int MAX_QUEUE_SIZE = 50000;
|
|
|
|
|
+ // 消息队列最大大小:200MB
|
|
|
|
|
+ private final static long MAX_QUEUE_SIZE_BYTES = 500L * 1024L * 1024L; // 200MB
|
|
|
|
|
+ // 上下线消息采样率:10%
|
|
|
|
|
+ private final static double ENTRY_EXIT_SAMPLE_RATE = 0.1;
|
|
|
|
|
+ // 聊天消息批量插入队列
|
|
|
|
|
+ private final static BlockingQueue<LiveMsg> liveMsgQueue = new LinkedBlockingQueue<>();
|
|
|
|
|
+ // 聊天消息批量插入阈值:500条
|
|
|
|
|
+ private final static int LIVE_MSG_BATCH_SIZE = 500;
|
|
|
|
|
+ // 聊天消息批量插入定时器间隔:10秒
|
|
|
|
|
+ private final static long LIVE_MSG_BATCH_INTERVAL = 10000; // 10秒
|
|
|
|
|
+ // Redis key:被禁言用户Set(按直播间)
|
|
|
|
|
+ private final static String BLOCKED_USERS_KEY = "live:blocked:users:%s";
|
|
|
|
|
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
|
|
+ private final StringRedisTemplate stringRedisTemplate = SpringUtils.getBean(StringRedisTemplate.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
|
private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
|
|
private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
|
|
|
private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
|
|
private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
|
|
|
|
|
+
|
|
|
|
|
+ // 用户信息缓存Key前缀
|
|
|
|
|
+ private static final String USER_INFO_CACHE_KEY = "ws:user:info:%s";
|
|
|
|
|
+ // 用户信息缓存过期时间:12小时
|
|
|
|
|
+ private static final long USER_INFO_CACHE_EXPIRE_HOURS = 12;
|
|
|
private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
|
|
private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
|
|
|
private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
|
|
private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
|
|
|
private final ILiveRedConfService liveRedConfService = SpringUtils.getBean(ILiveRedConfService.class);
|
|
private final ILiveRedConfService liveRedConfService = SpringUtils.getBean(ILiveRedConfService.class);
|
|
@@ -71,6 +145,9 @@ public class WebSocketServer {
|
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
|
private static Random random = new Random();
|
|
private static Random random = new Random();
|
|
|
|
|
|
|
|
|
|
+ // Redis key 前缀:用户进入直播间时间
|
|
|
|
|
+ public static final String USER_ENTRY_TIME_KEY = "live:user:entry:time:%s:%s";
|
|
|
|
|
+
|
|
|
// 直播间在线用户缓存
|
|
// 直播间在线用户缓存
|
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
@@ -103,48 +180,28 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
// 记录连接信息 管理员不记录
|
|
// 记录连接信息 管理员不记录
|
|
|
if (userType == 0) {
|
|
if (userType == 0) {
|
|
|
- FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
|
|
|
|
+ // 使用缓存获取轻量级用户信息(只包含必要字段)
|
|
|
|
|
+// LightweightUserInfo userInfo = getLightweightUserInfo(userId);
|
|
|
|
|
+// FsUser fsUser = toFsUser(userInfo);
|
|
|
|
|
+ FsUser fsUser = fsUserService.selectWsFsUserById(userId);
|
|
|
if (Objects.isNull(fsUser)) {
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
room.put(userId, session);
|
|
room.put(userId, session);
|
|
|
- // 直播间浏览量 +1
|
|
|
|
|
- redisCache.increment(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
|
|
-
|
|
|
|
|
- // 累计观看人次 +1
|
|
|
|
|
- redisCache.increment(TOTAL_VIEWS_KEY + liveId, 1);
|
|
|
|
|
-
|
|
|
|
|
- // 记录在线人数
|
|
|
|
|
- redisCache.increment(ONLINE_USERS_KEY + liveId, 1);
|
|
|
|
|
- // 将用户ID添加到在线用户Set中
|
|
|
|
|
- String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
|
|
- redisCache.redisTemplate.opsForSet().add(onlineUsersSetKey, String.valueOf(userId));
|
|
|
|
|
- // 获取Set的大小作为当前在线人数
|
|
|
|
|
- Long currentOnlineCount = redisCache.redisTemplate.opsForSet().size(onlineUsersSetKey);
|
|
|
|
|
- //最大同时在线人数 - 使用Set大小来判断
|
|
|
|
|
- Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
|
|
|
|
|
- int currentOnline = currentOnlineCount != null ? currentOnlineCount.intValue() : 0;
|
|
|
|
|
- if (maxOnline == null || currentOnline > maxOnline) {
|
|
|
|
|
- redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 判断是否是该直播间的首次访客(独立访客统计)
|
|
|
|
|
- boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
|
|
- if (isFirstVisit) {
|
|
|
|
|
|
|
|
|
|
- redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 判断是否是首次进入直播间的观众
|
|
|
|
|
- boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
|
|
- if (isFirstViewer) {
|
|
|
|
|
- redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // 存储用户进入直播间的时间到 Redis(用于计算在线时长)
|
|
|
|
|
+ // 如果已经存在进入时间,说明是重连,不应该覆盖,保持原来的进入时间
|
|
|
|
|
+ String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
|
|
+ Long existingEntryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
|
|
+
|
|
|
|
|
+ // 使用 Pipeline 批量执行 Redis 操作,减少网络交互次数
|
|
|
|
|
+ PipelineResult pipelineResult = executeUserJoinPipeline(liveId, userId, entryTimeKey, existingEntryTime);
|
|
|
|
|
|
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- if (1 == random.nextInt(10)) {
|
|
|
|
|
|
|
+ // 上线消息采样10%进入队列
|
|
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -154,8 +211,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 广播连接消息
|
|
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ // 将上线消息加入队列
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
@@ -193,10 +250,19 @@ public class WebSocketServer {
|
|
|
adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 初始化Session锁
|
|
|
|
|
- sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
|
|
|
|
+ // 初始化Session锁(使用StampedLock提升性能)
|
|
|
|
|
+ sessionLocks.putIfAbsent(session.getId(), new StampedLock());
|
|
|
// 初始化心跳时间
|
|
// 初始化心跳时间
|
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
|
|
+
|
|
|
|
|
+ // 如果有session,启动消费者线程
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
|
|
|
|
|
+ List<Session> tempAdminRoom = getAdminRoom(liveId);
|
|
|
|
|
+ boolean hasSession = (tempRoom != null && !tempRoom.isEmpty()) ||
|
|
|
|
|
+ (tempAdminRoom != null && !tempAdminRoom.isEmpty());
|
|
|
|
|
+ if (hasSession) {
|
|
|
|
|
+ startConsumerThread(liveId);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//关闭连接时调用
|
|
//关闭连接时调用
|
|
@@ -211,11 +277,7 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
if (userType == 0) {
|
|
if (userType == 0) {
|
|
|
- FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
|
|
- if (Objects.isNull(fsUser)) {
|
|
|
|
|
- throw new BaseException("用户信息错误");
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // close方法不需要fsUser参数,直接调用即可
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
|
room.remove(userId);
|
|
room.remove(userId);
|
|
|
|
|
|
|
@@ -224,13 +286,13 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
- // 直播间在线人数 -1
|
|
|
|
|
- redisCache.increment(ONLINE_USERS_KEY + liveId, -1);
|
|
|
|
|
- // 从在线用户Set中移除用户ID
|
|
|
|
|
- String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
|
|
- redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
|
|
- // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
|
|
- if (1 == new Random().nextInt(10)) {
|
|
|
|
|
|
|
+ // 使用 Pipeline 批量执行 Redis 操作,减少网络交互次数
|
|
|
|
|
+ executeUserClosePipeline(liveId, userId);
|
|
|
|
|
+ // 下线消息采样10%进入队列
|
|
|
|
|
+ if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
|
|
|
|
|
+ // 从缓存获取用户信息(轻量级,只包含必要字段)
|
|
|
|
|
+ FsUser fsUser = fsUserService.selectWsFsUserById(userId);
|
|
|
|
|
+
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -240,7 +302,8 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ // 将下线消息加入队列
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
adminRoom.remove(session);
|
|
adminRoom.remove(session);
|
|
@@ -254,9 +317,28 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
// 清理Session相关资源
|
|
// 清理Session相关资源
|
|
|
- heartbeatCache.remove(session.getId());
|
|
|
|
|
- sessionLocks.remove(session.getId());
|
|
|
|
|
|
|
+ String sessionId = session.getId();
|
|
|
|
|
+ heartbeatCache.remove(sessionId);
|
|
|
|
|
+
|
|
|
|
|
+ // 清理Session锁(确保锁被移除,避免内存泄漏)
|
|
|
|
|
+ StampedLock lock = sessionLocks.remove(sessionId);
|
|
|
|
|
+ if (lock != null) {
|
|
|
|
|
+ // 尝试获取写锁并立即释放,确保没有线程持有该锁
|
|
|
|
|
+ try {
|
|
|
|
|
+ long stamp = lock.tryWriteLock(10, TimeUnit.MILLISECONDS);
|
|
|
|
|
+ if (stamp != 0) {
|
|
|
|
|
+ lock.unlockWrite(stamp);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 清理发送队列监控
|
|
|
|
|
+ sessionQueueSizes.remove(sessionId);
|
|
|
|
|
|
|
|
|
|
+ // 检查并清理空的直播间资源
|
|
|
|
|
+ cleanupEmptyRoom(liveId);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//收到客户端信息
|
|
//收到客户端信息
|
|
@@ -266,6 +348,7 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userType = (long) userProperties.get("userType");
|
|
long userType = (long) userProperties.get("userType");
|
|
|
|
|
+ boolean isAdmin = false;
|
|
|
|
|
|
|
|
SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
if(msg.isOn()) return;
|
|
if(msg.isOn()) return;
|
|
@@ -273,9 +356,22 @@ public class WebSocketServer {
|
|
|
try {
|
|
try {
|
|
|
switch (msg.getCmd()) {
|
|
switch (msg.getCmd()) {
|
|
|
case "heartbeat":
|
|
case "heartbeat":
|
|
|
- // 更新心跳时间
|
|
|
|
|
- heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
|
|
- sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ // 更新心跳时间(使用putIfAbsent优化,减少不必要的更新)
|
|
|
|
|
+ String sessionId = session.getId();
|
|
|
|
|
+ heartbeatCache.put(sessionId, System.currentTimeMillis());
|
|
|
|
|
+
|
|
|
|
|
+ // 异步发送心跳响应,避免阻塞主线程
|
|
|
|
|
+ HEARTBEAT_EXECUTOR.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ String response = JSONObject.toJSONString(R.ok().put("data", msg));
|
|
|
|
|
+ // 使用异步发送,不阻塞
|
|
|
|
|
+ sendMessage(session,response);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception ignored) {
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
break;
|
|
break;
|
|
|
case "sendMsg":
|
|
case "sendMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
@@ -289,8 +385,10 @@ public class WebSocketServer {
|
|
|
liveMsg.setCreateTime(new Date());
|
|
liveMsg.setCreateTime(new Date());
|
|
|
|
|
|
|
|
if (userType == 0) {
|
|
if (userType == 0) {
|
|
|
- List<LiveWatchUser> liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
|
|
|
|
|
- if(!liveWatchUser.isEmpty() && liveWatchUser.get(0).getMsgStatus() == 1){
|
|
|
|
|
|
|
+ // 使用Redis Set检查用户是否被禁言
|
|
|
|
|
+ String blockedUsersKey = String.format(BLOCKED_USERS_KEY, msg.getLiveId());
|
|
|
|
|
+ boolean isBlocked = redisCache.redisTemplate.opsForSet().isMember(blockedUsersKey, String.valueOf(msg.getUserId()));
|
|
|
|
|
+ if (isBlocked) {
|
|
|
sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
|
|
sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -299,14 +397,25 @@ public class WebSocketServer {
|
|
|
Integer replayFlag = flagMap.get("replayFlag");
|
|
Integer replayFlag = flagMap.get("replayFlag");
|
|
|
liveMsg.setLiveFlag(liveFlag);
|
|
liveMsg.setLiveFlag(liveFlag);
|
|
|
liveMsg.setReplayFlag(replayFlag);
|
|
liveMsg.setReplayFlag(replayFlag);
|
|
|
- liveMsgService.insertLiveMsg(liveMsg);
|
|
|
|
|
|
|
+ // 将消息加入批量插入队列
|
|
|
|
|
+ try {
|
|
|
|
|
+ liveMsgQueue.offer(liveMsg);
|
|
|
|
|
+ // 如果队列超过阈值,立即触发批量插入
|
|
|
|
|
+ if (liveMsgQueue.size() >= LIVE_MSG_BATCH_SIZE) {
|
|
|
|
|
+ flushLiveMsgQueue();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("[聊天消息队列] 添加消息失败, liveId: {}, userId: {}, error: {}",
|
|
|
|
|
+ liveMsg.getLiveId(), liveMsg.getUserId(), e.getMessage());
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
msg.setOn(true);
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
|
|
|
|
|
- // 广播消息
|
|
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ // 将消息加入队列(普通用户消息)
|
|
|
|
|
+ isAdmin = (userType == 1);
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), isAdmin);
|
|
|
break;
|
|
break;
|
|
|
case "sendGift":
|
|
case "sendGift":
|
|
|
liveMsg = new LiveMsg();
|
|
liveMsg = new LiveMsg();
|
|
@@ -317,7 +426,7 @@ public class WebSocketServer {
|
|
|
liveMsg.setMsg(msg.getMsg());
|
|
liveMsg.setMsg(msg.getMsg());
|
|
|
msg.setOn(true);
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
break;
|
|
break;
|
|
|
case "sendTopMsg":
|
|
case "sendTopMsg":
|
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
@@ -332,7 +441,7 @@ public class WebSocketServer {
|
|
|
msg.setOn(true);
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
// 广播消息
|
|
// 广播消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
// 放在当前活动里面
|
|
// 放在当前活动里面
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
|
|
|
redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
|
|
redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
|
|
@@ -373,7 +482,7 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setUserType(0L);
|
|
sendMsgVo.setUserType(0L);
|
|
|
sendMsgVo.setCmd("deleteMsg");
|
|
sendMsgVo.setCmd("deleteMsg");
|
|
|
sendMsgVo.setMsg(msg.getMsg());
|
|
sendMsgVo.setMsg(msg.getMsg());
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), true);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void processCoupon(long liveId, SendMsgVo msg) {
|
|
private void processCoupon(long liveId, SendMsgVo msg) {
|
|
@@ -393,7 +502,7 @@ public class WebSocketServer {
|
|
|
} else {
|
|
} else {
|
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
|
|
redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
|
|
|
}
|
|
}
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -408,7 +517,7 @@ public class WebSocketServer {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setLiveId(liveId);
|
|
msg.setLiveId(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveGoods));
|
|
msg.setData(JSONObject.toJSONString(liveGoods));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -423,7 +532,7 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveRedConf)) {
|
|
if (Objects.nonNull(liveRedConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveRedConf));
|
|
msg.setData(JSONObject.toJSONString(liveRedConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -439,7 +548,7 @@ public class WebSocketServer {
|
|
|
if (Objects.nonNull(liveLotteryConf)) {
|
|
if (Objects.nonNull(liveLotteryConf)) {
|
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
liveService.asyncToCacheLiveConfig(liveId);
|
|
|
msg.setData(JSONObject.toJSONString(liveLotteryConf));
|
|
msg.setData(JSONObject.toJSONString(liveLotteryConf));
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -471,28 +580,133 @@ public class WebSocketServer {
|
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- //发送消息
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发送消息(线程安全,支持并发写入保护)
|
|
|
|
|
+ * 只对写入操作加锁,确保同一Session的消息串行发送
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param session WebSocket会话
|
|
|
|
|
+ * @param message 消息内容
|
|
|
|
|
+ * @throws IOException 发送失败时抛出异常
|
|
|
|
|
+ */
|
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
|
if (session == null || !session.isOpen()) {
|
|
if (session == null || !session.isOpen()) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 获取Session锁
|
|
|
|
|
- Lock lock = sessionLocks.get(session.getId());
|
|
|
|
|
|
|
+ String sessionId = session.getId();
|
|
|
|
|
+
|
|
|
|
|
+ // 检查消息大小,超大消息分片发送
|
|
|
|
|
+ byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
|
|
|
|
|
+ if (messageBytes.length > MAX_MESSAGE_SIZE) {
|
|
|
|
|
+ sendMessageInChunks(session, message, messageBytes);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取Session锁(使用StampedLock提升性能)
|
|
|
|
|
+ StampedLock lock = sessionLocks.get(sessionId);
|
|
|
if (lock == null) {
|
|
if (lock == null) {
|
|
|
// 如果锁不存在,创建一个新锁
|
|
// 如果锁不存在,创建一个新锁
|
|
|
- lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
|
|
|
|
+ lock = sessionLocks.computeIfAbsent(sessionId, k -> new StampedLock());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 使用锁保证同一Session的消息串行发送
|
|
|
|
|
- lock.lock();
|
|
|
|
|
|
|
+ // 使用写锁保证同一Session的消息串行发送
|
|
|
|
|
+ // 注意:只对写入操作加锁,读取操作(如session.isOpen())在锁外进行
|
|
|
|
|
+ long stamp = lock.writeLock();
|
|
|
try {
|
|
try {
|
|
|
- if (session.isOpen()) {
|
|
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
|
|
|
|
+ // 双重检查,确保连接仍然打开(在锁内再次检查,避免锁期间连接关闭)
|
|
|
|
|
+ if (!session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 使用同步发送,确保前一次写入完成后再发起新写入
|
|
|
|
|
+ session.getBasicRemote().sendText(message);
|
|
|
|
|
+ } catch (IllegalStateException e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ // TEXT_FULL_WRITING状态,说明前一次写入未完成
|
|
|
|
|
+ String errorMsg = e.getMessage();
|
|
|
|
|
+ if (errorMsg != null && errorMsg.contains("TEXT_FULL_WRITING")) {
|
|
|
|
|
+ // 等待一小段时间后重试
|
|
|
|
|
+ try {
|
|
|
|
|
+ Thread.sleep(10);
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ session.getBasicRemote().sendText(message);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ throw new IOException("发送消息被中断", ie);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ throw new IOException("WebSocket连接异常", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ // 连接已断开或其他异常
|
|
|
|
|
+ String errorMsg = e.getMessage();
|
|
|
|
|
+ if (errorMsg != null && (
|
|
|
|
|
+ errorMsg.contains("Broken pipe") ||
|
|
|
|
|
+ errorMsg.contains("Connection reset") ||
|
|
|
|
|
+ errorMsg.contains("Connection closed"))) {
|
|
|
|
|
+ // 静默处理,避免日志刷屏
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ throw new IOException("WebSocket发送失败", e);
|
|
|
}
|
|
}
|
|
|
} finally {
|
|
} finally {
|
|
|
- lock.unlock();
|
|
|
|
|
|
|
+ // 确保锁在finally中释放,避免死锁
|
|
|
|
|
+ lock.unlockWrite(stamp);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 分片发送超大消息(安全处理UTF-8字符边界)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void sendMessageInChunks(Session session, String message, byte[] messageBytes) throws IOException {
|
|
|
|
|
+ String sessionId = session.getId();
|
|
|
|
|
+ int totalSize = messageBytes.length;
|
|
|
|
|
+
|
|
|
|
|
+ log.warn("[超大消息分片] sessionId: {}, 消息大小: {} bytes, 将分片发送",
|
|
|
|
|
+ sessionId, totalSize);
|
|
|
|
|
+
|
|
|
|
|
+ int offset = 0;
|
|
|
|
|
+ int chunkIndex = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (offset < totalSize) {
|
|
|
|
|
+ // 计算当前分片的结束位置
|
|
|
|
|
+ int chunkEnd = Math.min(offset + CHUNK_SIZE, totalSize);
|
|
|
|
|
+
|
|
|
|
|
+ // 如果不在字符串末尾,需要找到UTF-8字符边界
|
|
|
|
|
+ if (chunkEnd < totalSize) {
|
|
|
|
|
+ // 从chunkEnd向前查找,找到完整的UTF-8字符边界
|
|
|
|
|
+ // UTF-8字符的第一个字节:0xxxxxxx 或 110xxxxx 或 1110xxxx 或 11110xxx
|
|
|
|
|
+ // UTF-8字符的后续字节:10xxxxxx
|
|
|
|
|
+ while (chunkEnd > offset && (messageBytes[chunkEnd] & 0xC0) == 0x80) {
|
|
|
|
|
+ chunkEnd--;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 提取分片(确保不截断UTF-8字符)
|
|
|
|
|
+ String chunk = new String(messageBytes, offset, chunkEnd - offset, StandardCharsets.UTF_8);
|
|
|
|
|
+
|
|
|
|
|
+ // 每个分片都通过sendMessage发送,确保串行化和锁保护
|
|
|
|
|
+ sendMessage(session, chunk);
|
|
|
|
|
+
|
|
|
|
|
+ offset = chunkEnd;
|
|
|
|
|
+ chunkIndex++;
|
|
|
|
|
+
|
|
|
|
|
+ // 分片之间稍作延迟,避免缓冲区溢出
|
|
|
|
|
+ if (offset < totalSize) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Thread.sleep(5);
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ throw new IOException("分片发送被中断", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ log.debug("[超大消息分片] sessionId: {}, 完成分片发送, 分片数: {}", sessionId, chunkIndex);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
@@ -505,11 +719,15 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
Session session = room.get(userId);
|
|
Session session = room.get(userId);
|
|
|
- if(Objects.isNull( session)) return;
|
|
|
|
|
- session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ if(Objects.isNull(session) || !session.isOpen()) return;
|
|
|
|
|
+ sendWithRetry(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), 1);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
|
|
|
+ // 将被禁言用户添加到Redis Set中
|
|
|
|
|
+ String blockedUsersKey = String.format(BLOCKED_USERS_KEY, liveId);
|
|
|
|
|
+ redisCache.redisTemplate.opsForSet().add(blockedUsersKey, String.valueOf(userId));
|
|
|
|
|
+
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -519,12 +737,12 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setData(null);
|
|
sendMsgVo.setData(null);
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
Session session = room.get(userId);
|
|
Session session = room.get(userId);
|
|
|
- if(Objects.isNull( session)) return;
|
|
|
|
|
- session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ if(Objects.isNull(session) || !session.isOpen()) return;
|
|
|
|
|
+ sendWithRetry(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), 1);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 广播消息
|
|
|
|
|
|
|
+ * 广播消息(优化:添加Session去重,避免同一Session重复发送)
|
|
|
* @param liveId 直播间ID
|
|
* @param liveId 直播间ID
|
|
|
* @param message 消息内容
|
|
* @param message 消息内容
|
|
|
*/
|
|
*/
|
|
@@ -532,72 +750,138 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
|
|
|
|
|
- // 普通用户房间:并行发送
|
|
|
|
|
|
|
+ // 使用Set去重,避免同一Session被多次处理
|
|
|
|
|
+ Set<Session> uniqueSessions = new HashSet<>();
|
|
|
|
|
+
|
|
|
|
|
+ // 收集普通用户房间的Session(去重)
|
|
|
room.forEach((k, v) -> {
|
|
room.forEach((k, v) -> {
|
|
|
- if (v.isOpen()) {
|
|
|
|
|
- sendWithRetry(v,message,1);
|
|
|
|
|
|
|
+ if (v != null && v.isOpen()) {
|
|
|
|
|
+ uniqueSessions.add(v);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 收集admin房间的Session(去重)
|
|
|
|
|
+ adminRoom.forEach(v -> {
|
|
|
|
|
+ if (v != null && v.isOpen()) {
|
|
|
|
|
+ uniqueSessions.add(v);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- // admin房间:串行发送,使用单线程执行器
|
|
|
|
|
- if (!adminRoom.isEmpty()) {
|
|
|
|
|
- ExecutorService executor = adminExecutors.get(liveId);
|
|
|
|
|
- if (executor != null && !executor.isShutdown()) {
|
|
|
|
|
- executor.submit(() -> {
|
|
|
|
|
- for (Session session : adminRoom) {
|
|
|
|
|
- if (session.isOpen()) {
|
|
|
|
|
- sendWithRetry(session, message, 1);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- } else {
|
|
|
|
|
- // 如果执行器不存在或已关闭,直接发送
|
|
|
|
|
- adminRoom.forEach(v -> {
|
|
|
|
|
- if (v.isOpen()) {
|
|
|
|
|
- sendWithRetry(v, message, 1);
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // 并行发送给所有唯一Session
|
|
|
|
|
+ uniqueSessions.parallelStream().forEach(session -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ // 单个Session发送失败不影响其他Session
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 广播点赞消息
|
|
|
|
|
|
|
+ * 广播点赞消息(优化:添加Session去重)
|
|
|
* @param liveId 直播间ID
|
|
* @param liveId 直播间ID
|
|
|
* @param message 消息内容
|
|
* @param message 消息内容
|
|
|
*/
|
|
*/
|
|
|
public void broadcastLikeMessage(Long liveId, String message) {
|
|
public void broadcastLikeMessage(Long liveId, String message) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
+
|
|
|
|
|
+ // 使用Set去重,避免同一Session被多次处理
|
|
|
|
|
+ Set<Session> uniqueSessions = new HashSet<>();
|
|
|
room.forEach((k, v) -> {
|
|
room.forEach((k, v) -> {
|
|
|
- if (v.isOpen()) {
|
|
|
|
|
- sendWithRetry(v,message,1);
|
|
|
|
|
|
|
+ if (v != null && v.isOpen()) {
|
|
|
|
|
+ uniqueSessions.add(v);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 并行发送给所有唯一Session
|
|
|
|
|
+ uniqueSessions.parallelStream().forEach(session -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ // 单个Session发送失败不影响其他Session
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 带重试机制的消息发送(统一使用sendMessage,避免直接调用getAsyncRemote)
|
|
|
|
|
+ * 解决心跳机制与业务消息并发冲突问题
|
|
|
|
|
+ */
|
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sessionId = session.getId();
|
|
|
|
|
+
|
|
|
|
|
+ // 更新发送队列监控
|
|
|
|
|
+ sessionQueueSizes.computeIfAbsent(sessionId, k -> new AtomicLong(0)).incrementAndGet();
|
|
|
|
|
+
|
|
|
int attempts = 0;
|
|
int attempts = 0;
|
|
|
while (attempts < maxRetries) {
|
|
while (attempts < maxRetries) {
|
|
|
try {
|
|
try {
|
|
|
- if(session.isOpen()) {
|
|
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
|
|
|
|
+ // 使用sendMessage统一发送,确保锁机制生效,避免并发冲突
|
|
|
|
|
+ sendMessage(session, message);
|
|
|
|
|
+
|
|
|
|
|
+ // 发送成功,更新监控
|
|
|
|
|
+ AtomicLong queueSize = sessionQueueSizes.get(sessionId);
|
|
|
|
|
+ if (queueSize != null) {
|
|
|
|
|
+ queueSize.decrementAndGet();
|
|
|
}
|
|
}
|
|
|
return; // 发送成功,退出
|
|
return; // 发送成功,退出
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
|
|
|
- attempts++;
|
|
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ String errorMsg = e.getMessage();
|
|
|
|
|
+
|
|
|
|
|
+ // 连接已断开,清理资源并返回
|
|
|
|
|
+ if (errorMsg != null && (
|
|
|
|
|
+ errorMsg.contains("Broken pipe") ||
|
|
|
|
|
+ errorMsg.contains("Connection reset") ||
|
|
|
|
|
+ errorMsg.contains("Connection closed") ||
|
|
|
|
|
+ errorMsg.contains("连接已断开"))) {
|
|
|
|
|
+ // 清理监控
|
|
|
|
|
+ sessionQueueSizes.remove(sessionId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // TEXT_FULL_WRITING或其他可重试错误
|
|
|
|
|
+ attempts++;
|
|
|
|
|
+ if (attempts < maxRetries) {
|
|
|
try {
|
|
try {
|
|
|
- TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 100));
|
|
|
|
|
|
|
+ // 指数退避策略
|
|
|
|
|
+ long delay = Math.min(50L * (1L << attempts), 500L);
|
|
|
|
|
+ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, (int)delay));
|
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
Thread.currentThread().interrupt();
|
|
Thread.currentThread().interrupt();
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
- } else {
|
|
|
|
|
- throw e;
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ // 其他异常,记录日志
|
|
|
|
|
+ if (attempts == 0) {
|
|
|
|
|
+ log.warn("[发送消息重试] 失败,sessionId: {}, attempts: {}, error: {}",
|
|
|
|
|
+ sessionId, attempts + 1, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ attempts++;
|
|
|
|
|
+ if (attempts >= maxRetries) {
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- log.info("超过重试次数, 消息 {}",message);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 超过重试次数,清理监控
|
|
|
|
|
+ AtomicLong queueSize = sessionQueueSizes.get(sessionId);
|
|
|
|
|
+ if (queueSize != null) {
|
|
|
|
|
+ queueSize.decrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (attempts >= maxRetries) {
|
|
|
|
|
+ log.debug("[发送消息] 超过重试次数, sessionId: {}, attempts: {}", sessionId, attempts);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void handleAutoTask(LiveAutoTask task) {
|
|
public void handleAutoTask(LiveAutoTask task) {
|
|
@@ -690,7 +974,7 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
msg.setStatus(1);
|
|
msg.setStatus(1);
|
|
|
- broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
|
|
|
|
+ enqueueMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)), true);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("定时任务执行异常:{}", e.getMessage());
|
|
log.error("定时任务执行异常:{}", e.getMessage());
|
|
|
}
|
|
}
|
|
@@ -741,6 +1025,7 @@ public class WebSocketServer {
|
|
|
String valueStr = cacheObject.toString().trim();
|
|
String valueStr = cacheObject.toString().trim();
|
|
|
current = Integer.parseInt(valueStr);
|
|
current = Integer.parseInt(valueStr);
|
|
|
} catch (NumberFormatException e) {
|
|
} catch (NumberFormatException e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
@@ -756,6 +1041,71 @@ public class WebSocketServer {
|
|
|
lastLikeCountCache.keySet().removeIf(liveId -> !activeLiveIds.contains(liveId));
|
|
lastLikeCountCache.keySet().removeIf(liveId -> !activeLiveIds.contains(liveId));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ @Scheduled(fixedRate = LIVE_MSG_BATCH_INTERVAL) // 每10秒执行一次
|
|
|
|
|
+ public void batchInsertLiveMsg() {
|
|
|
|
|
+ flushLiveMsgQueue();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 监控Session发送队列积压情况(每分钟执行一次)
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
|
|
+ public void monitorSessionQueues() {
|
|
|
|
|
+ long totalQueued = sessionQueueSizes.values().stream()
|
|
|
|
|
+ .mapToLong(AtomicLong::get)
|
|
|
|
|
+ .sum();
|
|
|
|
|
+
|
|
|
|
|
+ // 统计积压严重的Session(队列长度>10)
|
|
|
|
|
+ long highQueueCount = sessionQueueSizes.values().stream()
|
|
|
|
|
+ .filter(size -> size.get() > 10)
|
|
|
|
|
+ .count();
|
|
|
|
|
+
|
|
|
|
|
+ if (totalQueued > 100 || highQueueCount > 0) {
|
|
|
|
|
+ log.warn("[Session队列监控] 总积压消息数: {}, 高积压Session数: {}",
|
|
|
|
|
+ totalQueued, highQueueCount);
|
|
|
|
|
+
|
|
|
|
|
+ // 输出积压最严重的5个Session
|
|
|
|
|
+ sessionQueueSizes.entrySet().stream()
|
|
|
|
|
+ .filter(entry -> entry.getValue().get() > 10)
|
|
|
|
|
+ .sorted((a, b) -> Long.compare(b.getValue().get(), a.getValue().get()))
|
|
|
|
|
+ .limit(5)
|
|
|
|
|
+ .forEach(entry -> {
|
|
|
|
|
+ log.warn("[Session队列监控] sessionId: {}, 积压消息数: {}",
|
|
|
|
|
+ entry.getKey(), entry.getValue().get());
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量插入聊天消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ private void flushLiveMsgQueue() {
|
|
|
|
|
+ if (liveMsgQueue.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<LiveMsg> batchList = new ArrayList<>();
|
|
|
|
|
+ // 从队列中取出所有消息(最多500条)
|
|
|
|
|
+ int count = liveMsgQueue.drainTo(batchList, LIVE_MSG_BATCH_SIZE);
|
|
|
|
|
+
|
|
|
|
|
+ if (count > 0) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ int inserted = liveMsgService.insertLiveMsgBatch(batchList);
|
|
|
|
|
+ log.debug("[聊天消息批量插入] 成功插入 {} 条消息", inserted);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("[聊天消息批量插入] 插入失败, 条数: {}, error: {}", count, e.getMessage(), e);
|
|
|
|
|
+ // 插入失败,将消息重新放回队列(避免消息丢失)
|
|
|
|
|
+ batchList.forEach(msg -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ liveMsgQueue.offer(msg);
|
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
|
+ log.error("[聊天消息队列] 重新入队失败, liveId: {}, userId: {}", msg.getLiveId(), msg.getUserId());
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 定时清理无效会话(每分钟执行一次)
|
|
* 定时清理无效会话(每分钟执行一次)
|
|
|
* 检查心跳超时的会话并关闭
|
|
* 检查心跳超时的会话并关闭
|
|
@@ -824,4 +1174,469 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息队列包装类,支持优先级(管理员消息优先级更高)
|
|
|
|
|
+ */
|
|
|
|
|
+ private static class QueueMessage implements Comparable<QueueMessage> {
|
|
|
|
|
+ private final String message;
|
|
|
|
|
+ private final long timestamp;
|
|
|
|
|
+ private final int priority; // 0=普通消息, 1=管理员消息(优先级更高)
|
|
|
|
|
+ private final long sequence; // 序列号,用于相同优先级消息的FIFO排序
|
|
|
|
|
+ private final long sizeBytes; // 消息大小(字节数)
|
|
|
|
|
+
|
|
|
|
|
+ private static final AtomicLong sequenceGenerator = new AtomicLong(0);
|
|
|
|
|
+
|
|
|
|
|
+ public QueueMessage(String message, boolean isAdmin) {
|
|
|
|
|
+ this.message = message;
|
|
|
|
|
+ this.timestamp = System.currentTimeMillis();
|
|
|
|
|
+ this.priority = isAdmin ? 1 : 0;
|
|
|
|
|
+ this.sequence = sequenceGenerator.getAndIncrement();
|
|
|
|
|
+ // 计算消息大小(UTF-8编码)
|
|
|
|
|
+ this.sizeBytes = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public String getMessage() {
|
|
|
|
|
+ return message;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public long getSizeBytes() {
|
|
|
|
|
+ return sizeBytes;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public int compareTo(QueueMessage other) {
|
|
|
|
|
+ // 优先级高的先处理(管理员消息)
|
|
|
|
|
+ int priorityCompare = Integer.compare(other.priority, this.priority);
|
|
|
|
|
+ if (priorityCompare != 0) {
|
|
|
|
|
+ return priorityCompare;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 相同优先级按序列号排序(FIFO)
|
|
|
|
|
+ return Long.compare(this.sequence, other.sequence);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取或创建消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ private PriorityBlockingQueue<QueueMessage> getMessageQueue(Long liveId) {
|
|
|
|
|
+ return messageQueues.computeIfAbsent(liveId, k -> new PriorityBlockingQueue<>());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 启动消费者线程(如果还没有启动)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void startConsumerThread(Long liveId) {
|
|
|
|
|
+ consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
|
|
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
|
|
+
|
|
|
|
|
+ // 如果线程已经在运行,直接返回
|
|
|
|
|
+ if (runningFlag.get()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 尝试启动消费者线程
|
|
|
|
|
+ synchronized (consumerRunningFlags) {
|
|
|
|
|
+ if (runningFlag.compareAndSet(false, true)) {
|
|
|
|
|
+ Thread consumerThread = new Thread(() -> {
|
|
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
|
|
+ log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
|
|
|
|
|
+
|
|
|
|
|
+ while (runningFlag.get()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 检查是否还有session,如果没有则退出
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
|
|
+
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
|
|
+
|
|
|
|
|
+ if (!hasSession) {
|
|
|
|
|
+ log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 从队列中取消息,最多等待1秒
|
|
|
|
|
+ QueueMessage queueMessage = queue.poll(1, TimeUnit.SECONDS);
|
|
|
|
|
+ if (queueMessage != null) {
|
|
|
|
|
+ // 更新队列大小(减少)
|
|
|
|
|
+ AtomicLong currentSize = queueSizes.get(liveId);
|
|
|
|
|
+ if (currentSize != null) {
|
|
|
|
|
+ currentSize.addAndGet(-queueMessage.getSizeBytes());
|
|
|
|
|
+ }
|
|
|
|
|
+ // 广播消息
|
|
|
|
|
+ broadcastMessageFromQueue(liveId, queueMessage.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ break;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 清理资源
|
|
|
|
|
+ runningFlag.set(false);
|
|
|
|
|
+ consumerThreads.remove(liveId);
|
|
|
|
|
+ log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
|
|
|
|
|
+ }, "MessageConsumer-" + liveId);
|
|
|
|
|
+
|
|
|
|
|
+ consumerThread.setDaemon(true);
|
|
|
|
|
+ consumerThread.start();
|
|
|
|
|
+ consumerThreads.put(liveId, consumerThread);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 停止消费者线程
|
|
|
|
|
+ */
|
|
|
|
|
+ private void stopConsumerThread(Long liveId) {
|
|
|
|
|
+ AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
|
|
|
|
|
+ if (runningFlag != null) {
|
|
|
|
|
+ runningFlag.set(false);
|
|
|
|
|
+ }
|
|
|
|
|
+ Thread consumerThread = consumerThreads.remove(liveId);
|
|
|
|
|
+ if (consumerThread != null) {
|
|
|
|
|
+ consumerThread.interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 将消息加入队列
|
|
|
|
|
+ * @param liveId 直播间ID
|
|
|
|
|
+ * @param message 消息内容
|
|
|
|
|
+ * @param isAdmin 是否是管理员消息(管理员消息会插队)
|
|
|
|
|
+ * @return 是否成功加入队列
|
|
|
|
|
+ */
|
|
|
|
|
+ public boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
|
|
|
|
|
+ PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
|
|
|
|
|
+ AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
|
|
|
|
|
+
|
|
|
|
|
+ // 计算新消息的大小
|
|
|
|
|
+ long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
|
|
|
|
|
+
|
|
|
|
|
+ // 检查队列条数限制
|
|
|
|
|
+ if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
|
|
+ log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 检查队列大小限制(200MB)
|
|
|
|
|
+ long newTotalSize = currentSize.get() + messageSize;
|
|
|
|
|
+ if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
|
|
|
|
|
+ if (!isAdmin) {
|
|
|
|
|
+ // 普通消息超过大小限制,直接丢弃
|
|
|
|
|
+ log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB",
|
|
|
|
|
+ liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 管理员消息:需要移除一些普通消息以腾出空间
|
|
|
|
|
+ long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
|
|
|
|
|
+ long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
|
|
|
|
|
+ if (freedSize < needToFree) {
|
|
|
|
|
+ log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB",
|
|
|
|
|
+ liveId, needToFree / 1024.0, freedSize / 1024.0);
|
|
|
|
|
+ // 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果是管理员消息且队列条数已满,移除一个普通消息
|
|
|
|
|
+ if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
|
|
|
|
|
+ // 由于是优先级队列,普通消息(priority=0)会在队列末尾
|
|
|
|
|
+ // 尝试移除一个普通消息,为管理员消息腾出空间
|
|
|
|
|
+ QueueMessage removed = null;
|
|
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
|
|
+ while (iterator.hasNext()) {
|
|
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
|
|
+ if (msg.priority == 0) {
|
|
|
|
|
+ removed = msg;
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (removed != null) {
|
|
|
|
|
+ queue.remove(removed);
|
|
|
|
|
+ currentSize.addAndGet(-removed.getSizeBytes());
|
|
|
|
|
+ log.debug("[消息队列] 管理员消息插队,移除普通消息, liveId={}", liveId);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 如果没有普通消息,移除队列末尾的消息(可能是最早的管理员消息)
|
|
|
|
|
+ // 这种情况很少发生,因为管理员消息通常较少
|
|
|
|
|
+ log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ QueueMessage queueMessage = new QueueMessage(message, isAdmin);
|
|
|
|
|
+ queue.offer(queueMessage);
|
|
|
|
|
+ currentSize.addAndGet(messageSize);
|
|
|
|
|
+
|
|
|
|
|
+ // 如果有session,确保消费者线程在运行
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
|
|
+
|
|
|
|
|
+ if (hasSession) {
|
|
|
|
|
+ startConsumerThread(liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 移除消息以释放空间
|
|
|
|
|
+ * @param queue 消息队列
|
|
|
|
|
+ * @param currentSize 当前队列大小(原子变量)
|
|
|
|
|
+ * @param needToFree 需要释放的空间(字节数)
|
|
|
|
|
+ * @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
|
|
|
|
|
+ * @return 实际释放的空间(字节数)
|
|
|
|
|
+ */
|
|
|
|
|
+ private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue,
|
|
|
|
|
+ AtomicLong currentSize,
|
|
|
|
|
+ long needToFree,
|
|
|
|
|
+ boolean onlyRemoveNormal) {
|
|
|
|
|
+ long freedSize = 0;
|
|
|
|
|
+ List<QueueMessage> toRemove = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ // 收集需要移除的消息(优先移除普通消息)
|
|
|
|
|
+ Iterator<QueueMessage> iterator = queue.iterator();
|
|
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
|
|
+ if (!onlyRemoveNormal || msg.priority == 0) {
|
|
|
|
|
+ toRemove.add(msg);
|
|
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果只移除普通消息但空间还不够,可以移除管理员消息
|
|
|
|
|
+ if (onlyRemoveNormal && freedSize < needToFree) {
|
|
|
|
|
+ iterator = queue.iterator();
|
|
|
|
|
+ while (iterator.hasNext() && freedSize < needToFree) {
|
|
|
|
|
+ QueueMessage msg = iterator.next();
|
|
|
|
|
+ if (msg.priority == 1 && !toRemove.contains(msg)) {
|
|
|
|
|
+ toRemove.add(msg);
|
|
|
|
|
+ freedSize += msg.getSizeBytes();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 移除消息并更新大小
|
|
|
|
|
+ for (QueueMessage msg : toRemove) {
|
|
|
|
|
+ if (queue.remove(msg)) {
|
|
|
|
|
+ currentSize.addAndGet(-msg.getSizeBytes());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (freedSize > 0) {
|
|
|
|
|
+ log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB",
|
|
|
|
|
+ toRemove.size(), freedSize / 1024.0);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return freedSize;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从队列中消费消息并广播
|
|
|
|
|
+ */
|
|
|
|
|
+ private void broadcastMessageFromQueue(Long liveId, String message) {
|
|
|
|
|
+ broadcastMessage(liveId, message);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 检查并清理空的直播间资源
|
|
|
|
|
+ */
|
|
|
|
|
+ private void cleanupEmptyRoom(Long liveId) {
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
|
|
|
|
|
+ List<Session> adminRoom = adminRooms.get(liveId);
|
|
|
|
|
+
|
|
|
|
|
+ boolean hasSession = (room != null && !room.isEmpty()) ||
|
|
|
|
|
+ (adminRoom != null && !adminRoom.isEmpty());
|
|
|
|
|
+
|
|
|
|
|
+ if (!hasSession) {
|
|
|
|
|
+ // 停止消费者线程
|
|
|
|
|
+ stopConsumerThread(liveId);
|
|
|
|
|
+ // 清理消息队列
|
|
|
|
|
+ messageQueues.remove(liveId);
|
|
|
|
|
+ consumerRunningFlags.remove(liveId);
|
|
|
|
|
+ queueSizes.remove(liveId);
|
|
|
|
|
+ log.info("[消息队列] 清理空直播间资源, liveId={}", liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Redis Pipeline 批量操作结果包装类
|
|
|
|
|
+ */
|
|
|
|
|
+ private static class PipelineResult {
|
|
|
|
|
+ Long currentOnlineCount;
|
|
|
|
|
+ Integer maxOnline;
|
|
|
|
|
+ boolean isFirstVisit;
|
|
|
|
|
+ boolean isFirstViewer;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量执行用户进入直播间的 Redis 操作(使用 Pipeline 提升性能)
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param liveId 直播间ID
|
|
|
|
|
+ * @param userId 用户ID
|
|
|
|
|
+ * @param entryTimeKey 进入时间Key
|
|
|
|
|
+ * @param existingEntryTime 已存在的进入时间
|
|
|
|
|
+ * @return PipelineResult 包含批量操作的结果
|
|
|
|
|
+ */
|
|
|
|
|
+ private PipelineResult executeUserJoinPipeline(Long liveId, Long userId, String entryTimeKey, Long existingEntryTime) {
|
|
|
|
|
+ PipelineResult result = new PipelineResult();
|
|
|
|
|
+
|
|
|
|
|
+ // 使用 Pipeline 批量执行 Redis 操作
|
|
|
|
|
+ List<Object> pipelineResults = redisCache.redisTemplate.executePipelined(
|
|
|
|
|
+ (org.springframework.data.redis.core.RedisCallback<Object>) connection -> {
|
|
|
|
|
+ // 1. 设置进入时间(如果不存在)
|
|
|
|
|
+ if (existingEntryTime == null) {
|
|
|
|
|
+ connection.setEx(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(entryTimeKey),
|
|
|
|
|
+ 24 * 3600, // 24小时
|
|
|
|
|
+ redisCache.redisTemplate.getValueSerializer().serialize(System.currentTimeMillis())
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 批量递增操作
|
|
|
|
|
+ String pageViewsKey = PAGE_VIEWS_KEY + liveId;
|
|
|
|
|
+ String totalViewsKey = TOTAL_VIEWS_KEY + liveId;
|
|
|
|
|
+ String onlineUsersKey = ONLINE_USERS_KEY + liveId;
|
|
|
|
|
+
|
|
|
|
|
+ connection.incr(redisCache.redisTemplate.getKeySerializer().serialize(pageViewsKey));
|
|
|
|
|
+ connection.incr(redisCache.redisTemplate.getKeySerializer().serialize(totalViewsKey));
|
|
|
|
|
+ connection.incr(redisCache.redisTemplate.getKeySerializer().serialize(onlineUsersKey));
|
|
|
|
|
+
|
|
|
|
|
+ // 3. Set 操作:添加用户到在线用户Set
|
|
|
|
|
+ String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
|
|
+ connection.sAdd(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(onlineUsersSetKey),
|
|
|
|
|
+ redisCache.redisTemplate.getValueSerializer().serialize(String.valueOf(userId))
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 获取Set大小
|
|
|
|
|
+ connection.sCard(redisCache.redisTemplate.getKeySerializer().serialize(onlineUsersSetKey));
|
|
|
|
|
+
|
|
|
|
|
+ // 5. 获取最大在线人数
|
|
|
|
|
+ String maxOnlineKey = MAX_ONLINE_USERS_KEY + liveId;
|
|
|
|
|
+ connection.get(redisCache.redisTemplate.getKeySerializer().serialize(maxOnlineKey));
|
|
|
|
|
+
|
|
|
|
|
+ // 6. 判断是否首次访客
|
|
|
|
|
+ String userVisitKey = USER_VISIT_KEY + liveId + ":" + userId;
|
|
|
|
|
+ connection.setNX(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(userVisitKey),
|
|
|
|
|
+ redisCache.redisTemplate.getValueSerializer().serialize(1)
|
|
|
|
|
+ );
|
|
|
|
|
+ connection.expire(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(userVisitKey),
|
|
|
|
|
+ 86400 // 1天
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 7. 判断是否首次观众
|
|
|
|
|
+ String uniqueViewersUserKey = UNIQUE_VIEWERS_KEY + liveId + ":" + userId;
|
|
|
|
|
+ connection.setNX(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(uniqueViewersUserKey),
|
|
|
|
|
+ redisCache.redisTemplate.getValueSerializer().serialize(1)
|
|
|
|
|
+ );
|
|
|
|
|
+ connection.expire(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(uniqueViewersUserKey),
|
|
|
|
|
+ 86400 // 1天
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ return null; // Pipeline 模式下返回值会被忽略
|
|
|
|
|
+ }
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 解析 Pipeline 结果
|
|
|
|
|
+ try {
|
|
|
|
|
+ int index = 0;
|
|
|
|
|
+ if (existingEntryTime == null) {
|
|
|
|
|
+ index++; // 跳过 setEx 结果
|
|
|
|
|
+ }
|
|
|
|
|
+ index += 3; // 跳过 3 个 incr 结果
|
|
|
|
|
+ index++; // 跳过 sAdd 结果
|
|
|
|
|
+
|
|
|
|
|
+ // 获取 Set 大小
|
|
|
|
|
+ if (index < pipelineResults.size() && pipelineResults.get(index) != null) {
|
|
|
|
|
+ result.currentOnlineCount = ((Number) pipelineResults.get(index)).longValue();
|
|
|
|
|
+ }
|
|
|
|
|
+ index++;
|
|
|
|
|
+
|
|
|
|
|
+ // 获取最大在线人数
|
|
|
|
|
+ if (index < pipelineResults.size() && pipelineResults.get(index) != null) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String maxOnlineStr = pipelineResults.get(index).toString();
|
|
|
|
|
+ result.maxOnline = Integer.parseInt(maxOnlineStr);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ // 解析失败,忽略
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ index++;
|
|
|
|
|
+
|
|
|
|
|
+ // 判断是否首次访客
|
|
|
|
|
+ if (index < pipelineResults.size() && pipelineResults.get(index) != null) {
|
|
|
|
|
+ result.isFirstVisit = Boolean.TRUE.equals(pipelineResults.get(index));
|
|
|
|
|
+ if (result.isFirstVisit) {
|
|
|
|
|
+ // 首次访客,需要递增独立访客数
|
|
|
|
|
+ redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ index += 2; // 跳过 expire 结果
|
|
|
|
|
+
|
|
|
|
|
+ // 判断是否首次观众
|
|
|
|
|
+ if (index < pipelineResults.size() && pipelineResults.get(index) != null) {
|
|
|
|
|
+ result.isFirstViewer = Boolean.TRUE.equals(pipelineResults.get(index));
|
|
|
|
|
+ if (result.isFirstViewer) {
|
|
|
|
|
+ // 首次观众,需要递增独立观众数
|
|
|
|
|
+ redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 更新最大在线人数(如果需要)
|
|
|
|
|
+ if (result.currentOnlineCount != null) {
|
|
|
|
|
+ int currentOnline = result.currentOnlineCount.intValue();
|
|
|
|
|
+ if (result.maxOnline == null || currentOnline > result.maxOnline) {
|
|
|
|
|
+ redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量执行用户离开直播间的 Redis 操作(使用 Pipeline 提升性能)
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param liveId 直播间ID
|
|
|
|
|
+ * @param userId 用户ID
|
|
|
|
|
+ */
|
|
|
|
|
+ private void executeUserClosePipeline(Long liveId, Long userId) {
|
|
|
|
|
+ // 使用 Pipeline 批量执行 Redis 操作
|
|
|
|
|
+ redisCache.redisTemplate.executePipelined(
|
|
|
|
|
+ (org.springframework.data.redis.core.RedisCallback<Object>) connection -> {
|
|
|
|
|
+ // 1. 在线人数 -1
|
|
|
|
|
+ String onlineUsersKey = ONLINE_USERS_KEY + liveId;
|
|
|
|
|
+ connection.incrBy(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(onlineUsersKey),
|
|
|
|
|
+ -1
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 从在线用户Set中移除用户ID
|
|
|
|
|
+ String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
|
|
+ connection.sRem(
|
|
|
|
|
+ redisCache.redisTemplate.getKeySerializer().serialize(onlineUsersSetKey),
|
|
|
|
|
+ redisCache.redisTemplate.getValueSerializer().serialize(String.valueOf(userId))
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 删除进入时间记录
|
|
|
|
|
+ String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
|
|
+ connection.del(redisCache.redisTemplate.getKeySerializer().serialize(entryTimeKey));
|
|
|
|
|
+
|
|
|
|
|
+ return null; // Pipeline 模式下返回值会被忽略
|
|
|
|
|
+ }
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|