|
@@ -22,6 +22,7 @@ import com.fs.live.service.*;
|
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
@@ -30,10 +31,9 @@ import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.EOFException;
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
|
-import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
|
|
|
|
@@ -49,6 +49,16 @@ public class WebSocketServer {
|
|
|
private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
|
// 管理端连接
|
|
// 管理端连接
|
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
|
|
+
|
|
|
|
|
+ // Session发送锁,避免同一会话并发发送消息
|
|
|
|
|
+ private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
|
|
|
+ private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 心跳超时时间(毫秒):3分钟无心跳则认为超时
|
|
|
|
|
+ private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
|
|
|
|
|
+ // admin房间消息发送线程池(单线程,保证串行化)
|
|
|
|
|
+ private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
|
|
+
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
@@ -62,6 +72,7 @@ public class WebSocketServer {
|
|
|
private final ILiveUserFirstEntryService liveUserFirstEntryService = SpringUtils.getBean(ILiveUserFirstEntryService.class);
|
|
private final ILiveUserFirstEntryService liveUserFirstEntryService = SpringUtils.getBean(ILiveUserFirstEntryService.class);
|
|
|
private final ILiveCouponIssueService liveCouponIssueService = SpringUtils.getBean(ILiveCouponIssueService.class);
|
|
private final ILiveCouponIssueService liveCouponIssueService = SpringUtils.getBean(ILiveCouponIssueService.class);
|
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
|
|
|
+ private static Random random = new Random();
|
|
|
|
|
|
|
|
// 直播间在线用户缓存
|
|
// 直播间在线用户缓存
|
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
@@ -101,7 +112,7 @@ public class WebSocketServer {
|
|
|
throw new BaseException("用户信息错误");
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId, location);
|
|
|
|
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
room.put(userId, session);
|
|
room.put(userId, session);
|
|
|
// 直播间浏览量 +1
|
|
// 直播间浏览量 +1
|
|
|
redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
@@ -135,17 +146,19 @@ public class WebSocketServer {
|
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
}
|
|
}
|
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
|
|
- sendMsgVo.setCmd("entry");
|
|
|
|
|
- sendMsgVo.setMsg("用户进入");
|
|
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
|
|
- // 广播连接消息
|
|
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ if (1 == random.nextInt(4)) {
|
|
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
|
|
+ sendMsgVo.setCmd("entry");
|
|
|
|
|
+ sendMsgVo.setMsg("用户进入");
|
|
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
|
|
+ // 广播连接消息
|
|
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
if (liveUserFirstEntry != null) {
|
|
if (liveUserFirstEntry != null) {
|
|
@@ -177,9 +190,15 @@ public class WebSocketServer {
|
|
|
|
|
|
|
|
} else {
|
|
} else {
|
|
|
adminRoom.add(session);
|
|
adminRoom.add(session);
|
|
|
|
|
+ // 为admin房间创建单线程执行器,保证串行化发送
|
|
|
|
|
+ adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
|
|
|
|
+ // 初始化Session锁
|
|
|
|
|
+ sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
|
|
+ // 初始化心跳时间
|
|
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
|
|
+
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//关闭连接时调用
|
|
//关闭连接时调用
|
|
@@ -209,24 +228,38 @@ public class WebSocketServer {
|
|
|
// 从在线用户Set中移除用户ID
|
|
// 从在线用户Set中移除用户ID
|
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
|
|
- sendMsgVo.setCmd("out");
|
|
|
|
|
- sendMsgVo.setMsg("用户离开");
|
|
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
|
|
-
|
|
|
|
|
- // 广播离开消息
|
|
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
|
|
+ if (1 == new Random().nextInt(4)) {
|
|
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
|
|
+ sendMsgVo.setCmd("out");
|
|
|
|
|
+ sendMsgVo.setMsg("用户离开");
|
|
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
} else {
|
|
} else {
|
|
|
adminRoom.remove(session);
|
|
adminRoom.remove(session);
|
|
|
|
|
+ // 如果admin房间为空,关闭并清理执行器
|
|
|
|
|
+ if (adminRoom.isEmpty()) {
|
|
|
|
|
+ ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
|
|
+ if (executor != null) {
|
|
|
|
|
+ executor.shutdown();
|
|
|
|
|
+ }
|
|
|
|
|
+ adminRooms.remove(liveId);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
|
|
|
|
+ // 清理Session相关资源
|
|
|
|
|
+ heartbeatCache.remove(session.getId());
|
|
|
|
|
+ sessionLocks.remove(session.getId());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//收到客户端信息
|
|
//收到客户端信息
|
|
@@ -243,6 +276,8 @@ public class WebSocketServer {
|
|
|
try {
|
|
try {
|
|
|
switch (msg.getCmd()) {
|
|
switch (msg.getCmd()) {
|
|
|
case "heartbeat":
|
|
case "heartbeat":
|
|
|
|
|
+ // 更新心跳时间
|
|
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
break;
|
|
|
case "sendMsg":
|
|
case "sendMsg":
|
|
@@ -436,7 +471,6 @@ public class WebSocketServer {
|
|
|
* 处理红包变动消息
|
|
* 处理红包变动消息
|
|
|
*/
|
|
*/
|
|
|
private void processRed(Long liveId, SendMsgVo msg) {
|
|
private void processRed(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("redData: {}", msg);
|
|
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
msg.setStatus( status);
|
|
@@ -452,7 +486,6 @@ public class WebSocketServer {
|
|
|
* 处理抽奖变动消息
|
|
* 处理抽奖变动消息
|
|
|
*/
|
|
*/
|
|
|
private void processLottery(Long liveId, SendMsgVo msg) {
|
|
private void processLottery(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("lotteryData: {}", msg);
|
|
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
msg.setStatus( status);
|
|
@@ -471,12 +504,7 @@ public class WebSocketServer {
|
|
|
try {
|
|
try {
|
|
|
this.onClose(session);
|
|
this.onClose(session);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("webSocket 错误 onError", e);
|
|
|
|
|
- }
|
|
|
|
|
- if (throwable instanceof EOFException) {
|
|
|
|
|
- log.info("WebSocket连接被客户端正常关闭(EOF),sessionId: {}", session.getId());
|
|
|
|
|
- } else {
|
|
|
|
|
- log.error("WebSocket连接错误", throwable);
|
|
|
|
|
|
|
+ log.error("webSocket 错误处理失败", e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -498,12 +526,36 @@ public class WebSocketServer {
|
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- //发送消息
|
|
|
|
|
|
|
+ //发送消息(带锁机制,避免并发发送)
|
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
|
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取Session锁
|
|
|
|
|
+ Lock lock = sessionLocks.get(session.getId());
|
|
|
|
|
+ if (lock == null) {
|
|
|
|
|
+ // 如果锁不存在,创建一个新锁
|
|
|
|
|
+ lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 使用锁保证同一Session的消息串行发送
|
|
|
|
|
+ lock.lock();
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ session.getAsyncRemote().sendText(message);
|
|
|
|
|
+ }
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ lock.unlock();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
+ Session session = room.get(userId);
|
|
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -511,13 +563,19 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("Integral");
|
|
sendMsgVo.setCmd("Integral");
|
|
|
sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
- Session session = room.get(userId);
|
|
|
|
|
|
|
+
|
|
|
if(Objects.isNull( session)) return;
|
|
if(Objects.isNull( session)) return;
|
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
|
|
|
+
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
+ Session session = room.get(userId);
|
|
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
sendMsgVo.setUserId(userId);
|
|
@@ -525,8 +583,7 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("blockUser");
|
|
sendMsgVo.setCmd("blockUser");
|
|
|
sendMsgVo.setMsg("账号已被停用");
|
|
sendMsgVo.setMsg("账号已被停用");
|
|
|
sendMsgVo.setData(null);
|
|
sendMsgVo.setData(null);
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
|
|
- Session session = room.get(userId);
|
|
|
|
|
|
|
+
|
|
|
if(Objects.isNull( session)) return;
|
|
if(Objects.isNull( session)) return;
|
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
}
|
|
@@ -540,16 +597,33 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
|
|
|
|
|
|
|
+ // 普通用户房间:并行发送
|
|
|
room.forEach((k, v) -> {
|
|
room.forEach((k, v) -> {
|
|
|
if (v.isOpen()) {
|
|
if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
|
|
|
|
+ sendWithRetry(v,message,1);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
- adminRoom.forEach(v -> {
|
|
|
|
|
- if (v.isOpen()) {
|
|
|
|
|
- sendWithRetry(v,message,7);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // admin房间:串行发送,使用单线程执行器
|
|
|
|
|
+ if (!adminRoom.isEmpty()) {
|
|
|
|
|
+ ExecutorService executor = adminExecutors.get(liveId);
|
|
|
|
|
+ if (executor != null && !executor.isShutdown()) {
|
|
|
|
|
+ executor.submit(() -> {
|
|
|
|
|
+ for (Session session : adminRoom) {
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 如果执行器不存在或已关闭,直接发送
|
|
|
|
|
+ adminRoom.forEach(v -> {
|
|
|
|
|
+ if (v.isOpen()) {
|
|
|
|
|
+ sendWithRetry(v, message, 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
- });
|
|
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void removeLikeCountCache(Long liveId) {
|
|
public void removeLikeCountCache(Long liveId) {
|
|
@@ -569,7 +643,6 @@ public class WebSocketServer {
|
|
|
String valueStr = cacheObject.toString().trim();
|
|
String valueStr = cacheObject.toString().trim();
|
|
|
current = Integer.parseInt(valueStr);
|
|
current = Integer.parseInt(valueStr);
|
|
|
} catch (NumberFormatException e) {
|
|
} catch (NumberFormatException e) {
|
|
|
- log.error("点赞数格式错误,liveId: {}, value: {}", liveId, cacheObject, e);
|
|
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
@@ -586,6 +659,97 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ @Scheduled(fixedRate = 2000)// 每2秒执行一次
|
|
|
|
|
+ public void broadcastUserNumMessage() {
|
|
|
|
|
+ // 遍历每个直播间
|
|
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> entry : rooms.entrySet()) {
|
|
|
|
|
+ Long liveId = entry.getKey();
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = entry.getValue();
|
|
|
|
|
+
|
|
|
|
|
+ // 统计当前直播间的在线人数
|
|
|
|
|
+ int onlineCount = room.size();
|
|
|
|
|
+
|
|
|
|
|
+ // 构造消息
|
|
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
|
|
+ sendMsgVo.setCmd("userCount");
|
|
|
|
|
+ sendMsgVo.setData(String.valueOf(onlineCount));
|
|
|
|
|
+
|
|
|
|
|
+ // 广播当前直播间的在线人数
|
|
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 定时清理无效会话(每分钟执行一次)
|
|
|
|
|
+ * 检查心跳超时的会话并关闭
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
|
|
+ public void cleanInactiveSessions() {
|
|
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
|
|
+ int cleanedCount = 0;
|
|
|
|
|
+
|
|
|
|
|
+ // 遍历所有直播间
|
|
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
|
|
|
|
|
+ Long liveId = roomEntry.getKey();
|
|
|
|
|
+ ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
|
|
|
|
|
+
|
|
|
|
|
+ // 检查普通用户会话
|
|
|
|
|
+ List<Long> toRemove = new ArrayList<>();
|
|
|
|
|
+ room.forEach((userId, session) -> {
|
|
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
|
|
+ toRemove.add(userId);
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
|
|
|
|
|
+ session.getId(), liveId, userId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 移除超时的会话
|
|
|
|
|
+ toRemove.forEach(room::remove);
|
|
|
|
|
+ cleanedCount += toRemove.size();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 检查admin房间
|
|
|
|
|
+ for (Map.Entry<Long, CopyOnWriteArrayList<Session>> adminEntry : adminRooms.entrySet()) {
|
|
|
|
|
+ Long liveId = adminEntry.getKey();
|
|
|
|
|
+ CopyOnWriteArrayList<Session> adminRoom = adminEntry.getValue();
|
|
|
|
|
+
|
|
|
|
|
+ List<Session> toRemoveAdmin = new ArrayList<>();
|
|
|
|
|
+ for (Session session : adminRoom) {
|
|
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
|
|
+ toRemoveAdmin.add(session);
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (session.isOpen()) {
|
|
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("关闭admin超时会话失败: sessionId={}, liveId={}",
|
|
|
|
|
+ session.getId(), liveId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 移除超时的admin会话
|
|
|
|
|
+ toRemoveAdmin.forEach(adminRoom::remove);
|
|
|
|
|
+ cleanedCount += toRemoveAdmin.size();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (cleanedCount > 0) {
|
|
|
|
|
+ if (random.nextInt(10) == 1) {
|
|
|
|
|
+ log.info("已清理 {} 个无效会话", cleanedCount);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 广播点赞消息
|
|
* 广播点赞消息
|
|
|
* @param liveId 直播间ID
|
|
* @param liveId 直播间ID
|
|
@@ -601,12 +765,15 @@ public class WebSocketServer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
int attempts = 0;
|
|
int attempts = 0;
|
|
|
while (attempts < maxRetries) {
|
|
while (attempts < maxRetries) {
|
|
|
try {
|
|
try {
|
|
|
- if(session.isOpen()) {
|
|
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // 使用带锁的sendMessage方法,避免并发发送
|
|
|
|
|
+ sendMessage(session, message);
|
|
|
return; // 发送成功,退出
|
|
return; // 发送成功,退出
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
@@ -618,11 +785,15 @@ public class WebSocketServer {
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- throw e;
|
|
|
|
|
|
|
+ log.error("发送消息失败: sessionId={}, error={}", session.getId(), e.getMessage(), e);
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- log.info("超过重试次数, 消息 {}",message);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if (attempts >= maxRetries) {
|
|
|
|
|
+ log.warn("超过重试次数({}),放弃发送消息: sessionId={}", maxRetries, session.getId());
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -728,4 +899,6 @@ public class WebSocketServer {
|
|
|
String key = "live:auto_task:";
|
|
String key = "live:auto_task:";
|
|
|
redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
|
|
redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
}
|
|
}
|
|
|
|
|
+
|