|
|
@@ -4,11 +4,18 @@ package com.fs.live.websocket.service;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fs.common.constant.LiveKeysConstant;
|
|
|
+import com.fs.common.core.redis.RedisCacheT;
|
|
|
import com.fs.common.exception.base.BaseException;
|
|
|
+import com.fs.common.utils.date.DateUtil;
|
|
|
import com.fs.his.domain.FsUser;
|
|
|
import com.fs.his.service.IFsUserService;
|
|
|
+import com.fs.hisStore.domain.FsUserScrm;
|
|
|
+import com.fs.hisStore.service.IFsUserScrmService;
|
|
|
import com.fs.live.config.ProductionWordFilter;
|
|
|
import com.fs.live.mapper.LiveCouponMapper;
|
|
|
+import com.fs.live.vo.LiveWatchUserEntry;
|
|
|
+import com.fs.live.domain.LiveWatchLog;
|
|
|
+import com.fs.live.domain.LiveVideo;
|
|
|
import com.fs.live.websocket.auth.WebSocketConfigurator;
|
|
|
import com.fs.live.websocket.bean.SendMsgVo;
|
|
|
import com.fs.common.core.domain.R;
|
|
|
@@ -20,17 +27,20 @@ import com.fs.live.service.*;
|
|
|
import com.fs.live.vo.LiveGoodsVo;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.time.DateUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.websocket.*;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
+import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
+import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
-import java.util.concurrent.ThreadLocalRandom;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
|
|
|
@@ -46,11 +56,21 @@ public class WebSocketServer {
|
|
|
private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
|
// 管理端连接
|
|
|
private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // Session发送锁,避免同一会话并发发送消息
|
|
|
+ private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
|
|
|
+ // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
|
|
|
+ private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
|
|
|
+ // 心跳超时时间(毫秒):3分钟无心跳则认为超时
|
|
|
+ private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
|
|
|
+ // admin房间消息发送线程池(单线程,保证串行化)
|
|
|
+ private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
|
private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
|
|
|
- private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
|
|
|
+ private final IFsUserScrmService fsUserService = SpringUtils.getBean(IFsUserScrmService.class);
|
|
|
private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
|
|
|
private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
|
|
|
private final ILiveRedConfService liveRedConfService = SpringUtils.getBean(ILiveRedConfService.class);
|
|
|
@@ -59,6 +79,13 @@ public class WebSocketServer {
|
|
|
private final ILiveUserFirstEntryService liveUserFirstEntryService = SpringUtils.getBean(ILiveUserFirstEntryService.class);
|
|
|
private final ILiveCouponIssueService liveCouponIssueService = SpringUtils.getBean(ILiveCouponIssueService.class);
|
|
|
private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
|
|
|
+ private final ILiveWatchLogService liveWatchLogService = SpringUtils.getBean(ILiveWatchLogService.class);
|
|
|
+ private final ILiveVideoService liveVideoService = SpringUtils.getBean(ILiveVideoService.class);
|
|
|
+ private final ILiveCompletionPointsRecordService completionPointsRecordService = SpringUtils.getBean(ILiveCompletionPointsRecordService.class);
|
|
|
+ private static Random random = new Random();
|
|
|
+
|
|
|
+ // Redis key 前缀:用户进入直播间时间
|
|
|
+ private static final String USER_ENTRY_TIME_KEY = "live:user:entry:time:%s:%s"; // liveId:userId
|
|
|
|
|
|
// 直播间在线用户缓存
|
|
|
// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
|
@@ -72,6 +99,10 @@ public class WebSocketServer {
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userId = (long) userProperties.get("userId");
|
|
|
long userType = (long) userProperties.get("userType");
|
|
|
+ long qwUserId = -1;
|
|
|
+ long externalContactId = -1;
|
|
|
+ String location = (String) userProperties.get("location"); // 获取location参数
|
|
|
+
|
|
|
Live live = liveService.selectLiveByLiveId(liveId);
|
|
|
if (live == null) {
|
|
|
throw new BaseException("未找到直播间");
|
|
|
@@ -84,6 +115,12 @@ public class WebSocketServer {
|
|
|
if (!Objects.isNull(userProperties.get("companyUserId"))) {
|
|
|
companyUserId = (long) userProperties.get("companyUserId");
|
|
|
}
|
|
|
+ if (!Objects.isNull(userProperties.get("qwUserId"))) {
|
|
|
+ qwUserId = (long) userProperties.get("qwUserId");
|
|
|
+ }
|
|
|
+ if (!Objects.isNull(userProperties.get("externalContactId"))) {
|
|
|
+ externalContactId = (long) userProperties.get("externalContactId");
|
|
|
+ }
|
|
|
|
|
|
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
@@ -91,13 +128,18 @@ public class WebSocketServer {
|
|
|
|
|
|
// 记录连接信息 管理员不记录
|
|
|
if (userType == 0) {
|
|
|
- FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
+ FsUserScrm fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId);
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
room.put(userId, session);
|
|
|
+
|
|
|
+ // 存储用户进入直播间的时间到 Redis(用于计算在线时长)
|
|
|
+ String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
+ redisCache.setCacheObject(entryTimeKey, System.currentTimeMillis(), 24, TimeUnit.HOURS);
|
|
|
+
|
|
|
// 直播间浏览量 +1
|
|
|
redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
|
|
|
@@ -129,26 +171,47 @@ public class WebSocketServer {
|
|
|
if (isFirstViewer) {
|
|
|
redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
}
|
|
|
- LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(liveId, userId);
|
|
|
- liveWatchUserVO.setMsgStatus(liveWatchUser.getMsgStatus());
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("entry");
|
|
|
- sendMsgVo.setMsg("用户进入");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
- // 广播连接消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
|
|
|
+ if (1 == random.nextInt(10)) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("entry");
|
|
|
+ sendMsgVo.setMsg("用户进入");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ // 广播连接消息
|
|
|
+ broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
|
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
+ // 如果用户连上了 socket,并且公司ID和销售ID大于0,更新 LiveWatchLog 的 logType
|
|
|
+
|
|
|
+ if ((qwUserId > 0 && externalContactId > 0) || (liveUserFirstEntry != null && liveUserFirstEntry.getCompanyId() > 0 && liveUserFirstEntry.getCompanyUserId() > 0 )) {
|
|
|
+ // 获取当前直播/回放状态
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
+ Integer currentLiveFlag = flagMap.get("liveFlag");
|
|
|
+
|
|
|
+ // 如果当前是直播状态(liveFlag = 1),更新 logType
|
|
|
+ if (currentLiveFlag != null && currentLiveFlag == 1) {
|
|
|
+ updateLiveWatchLogTypeOnConnect(liveId, userId, qwUserId, externalContactId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
if (liveUserFirstEntry != null) {
|
|
|
// 处理第一次自己进入,第二次扫码销售进入
|
|
|
if (liveUserFirstEntry.getCompanyUserId() == -1L && companyUserId != -1L) {
|
|
|
liveUserFirstEntry.setCompanyId(companyId);
|
|
|
liveUserFirstEntry.setCompanyUserId(companyUserId);
|
|
|
+ if (qwUserId != -1) {
|
|
|
+ liveUserFirstEntry.setQwUserId(qwUserId);
|
|
|
+ }
|
|
|
+ if (externalContactId!= -1) {
|
|
|
+ liveUserFirstEntry.setExternalContactId(externalContactId);
|
|
|
+ }
|
|
|
liveUserFirstEntryService.updateLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -167,21 +230,43 @@ public class WebSocketServer {
|
|
|
liveUserFirstEntry.setEntryDate(date);
|
|
|
liveUserFirstEntry.setFirstEntryTime(date);
|
|
|
liveUserFirstEntry.setUpdateTime( date);
|
|
|
+ if (qwUserId != -1) {
|
|
|
+ liveUserFirstEntry.setQwUserId(qwUserId);
|
|
|
+ }
|
|
|
+ if (externalContactId!= -1) {
|
|
|
+ liveUserFirstEntry.setExternalContactId(externalContactId);
|
|
|
+ }
|
|
|
liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
}
|
|
|
+ redisCache.setCacheObject( "live:user:first:entry:" + liveId + ":" + userId, liveUserFirstEntry,1, TimeUnit.HOURS);
|
|
|
|
|
|
|
|
|
} else {
|
|
|
adminRoom.add(session);
|
|
|
+ // 为admin房间创建单线程执行器,保证串行化发送
|
|
|
+ adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
|
|
|
}
|
|
|
|
|
|
- log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
+ // 初始化Session锁
|
|
|
+ sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
|
|
|
+ // 初始化心跳时间
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//关闭连接时调用
|
|
|
@OnClose
|
|
|
public void onClose(Session session) {
|
|
|
Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+ // 获取公司ID和销售ID
|
|
|
+ long companyId = -1L;
|
|
|
+ long companyUserId = -1L;
|
|
|
+ if (!Objects.isNull(userProperties.get("companyId"))) {
|
|
|
+ companyId = (long) userProperties.get("companyId");
|
|
|
+ }
|
|
|
+ if (!Objects.isNull(userProperties.get("companyUserId"))) {
|
|
|
+ companyUserId = (long) userProperties.get("companyUserId");
|
|
|
+ }
|
|
|
|
|
|
long liveId = (long) userProperties.get("liveId");
|
|
|
long userId = (long) userProperties.get("userId");
|
|
|
@@ -190,14 +275,12 @@ public class WebSocketServer {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
if (userType == 0) {
|
|
|
- FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
+ FsUserScrm fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
if (Objects.isNull(fsUser)) {
|
|
|
throw new BaseException("用户信息错误");
|
|
|
}
|
|
|
-
|
|
|
- LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
|
+ // 计算并更新用户在线时长
|
|
|
room.remove(userId);
|
|
|
-
|
|
|
if (room.isEmpty()) {
|
|
|
rooms.remove(liveId);
|
|
|
}
|
|
|
@@ -209,23 +292,38 @@ public class WebSocketServer {
|
|
|
String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
|
|
|
redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
|
|
|
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setUserId(userId);
|
|
|
- sendMsgVo.setUserType(userType);
|
|
|
- sendMsgVo.setCmd("out");
|
|
|
- sendMsgVo.setMsg("用户离开");
|
|
|
- sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
- sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
- sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
-
|
|
|
- // 广播离开消息
|
|
|
- broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
|
|
|
+
|
|
|
+
|
|
|
+ // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
|
|
|
+ if (1 == new Random().nextInt(10)) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("out");
|
|
|
+ sendMsgVo.setMsg("用户离开");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+
|
|
|
} else {
|
|
|
adminRoom.remove(session);
|
|
|
+ // 如果admin房间为空,关闭并清理执行器
|
|
|
+ if (adminRoom.isEmpty()) {
|
|
|
+ ExecutorService executor = adminExecutors.remove(liveId);
|
|
|
+ if (executor != null) {
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+ adminRooms.remove(liveId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
+ // 清理Session相关资源
|
|
|
+ heartbeatCache.remove(session.getId());
|
|
|
+ sessionLocks.remove(session.getId());
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@@ -242,6 +340,64 @@ public class WebSocketServer {
|
|
|
try {
|
|
|
switch (msg.getCmd()) {
|
|
|
case "heartbeat":
|
|
|
+ // 更新心跳时间
|
|
|
+ heartbeatCache.put(session.getId(), System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 心跳时同步更新观看时长到Redis Hash
|
|
|
+ long watchUserId = (long) userProperties.get("userId");
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if (msg.getData() != null && !msg.getData().isEmpty()) {
|
|
|
+ try {
|
|
|
+ Long currentDuration = Long.parseLong(msg.getData());
|
|
|
+
|
|
|
+ Live currentLive = liveService.selectLiveByLiveId(liveId);
|
|
|
+ if (currentLive == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断直播是否已开始:status=2(直播中) 或 当前时间 >= 开播时间
|
|
|
+ boolean isLiveStarted = false;
|
|
|
+ if (currentLive.getStatus() != null && currentLive.getStatus() == 2) {
|
|
|
+ // status=2 表示直播中
|
|
|
+ isLiveStarted = true;
|
|
|
+ } else if (currentLive.getStartTime() != null) {
|
|
|
+ // 判断当前时间是否已超过开播时间
|
|
|
+ LocalDateTime now = java.time.LocalDateTime.now();
|
|
|
+ isLiveStarted = now.isAfter(currentLive.getStartTime()) || now.isEqual(currentLive.getStartTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!isLiveStarted) {
|
|
|
+ log.debug("[心跳-观看时长] 直播未开始(开播倒计时中),不统计观看时长, liveId={}, status={}, startTime={}",
|
|
|
+ liveId, currentLive.getStatus(), currentLive.getStartTime());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.debug("[心跳-观看时长] 直播已开始,统计观看时长, liveId={}, userId={}, duration={}秒",
|
|
|
+ liveId, watchUserId, currentDuration);
|
|
|
+
|
|
|
+ // 使用Hash结构存储:一个直播间一个Hash,包含所有用户的时长
|
|
|
+ String hashKey = "live:watch:duration:hash:" + liveId;
|
|
|
+ String userIdField = String.valueOf(watchUserId);
|
|
|
+ // 获取现有时长
|
|
|
+ Object existingDuration = redisCache.hashGet(hashKey, userIdField);
|
|
|
+ // 只有当新的时长更大时才更新
|
|
|
+ if (existingDuration == null || currentDuration > Long.parseLong(existingDuration.toString())) {
|
|
|
+ // 更新Hash中的用户时长
|
|
|
+ redisCache.hashPut(hashKey, userIdField, currentDuration.toString());
|
|
|
+ // 设置过期时间(2小时)
|
|
|
+ redisCache.expire(hashKey, 2, TimeUnit.HOURS);
|
|
|
+
|
|
|
+ checkAndSendCompletionPointsInRealTime(liveId, watchUserId, currentDuration);
|
|
|
+
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[心跳-观看时长] 更新失败, liveId={}, userId={}, data={}",
|
|
|
+ liveId, watchUserId, msg.getData(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
|
case "sendMsg":
|
|
|
@@ -256,12 +412,18 @@ public class WebSocketServer {
|
|
|
liveMsg.setCreateTime(new Date());
|
|
|
|
|
|
if (userType == 0) {
|
|
|
- LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
|
|
|
- if(liveWatchUser.getMsgStatus() == 1){
|
|
|
+ List<LiveWatchUser> liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
|
|
|
+ if(!liveWatchUser.isEmpty() && liveWatchUser.get(0).getMsgStatus() == 1){
|
|
|
sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
+ Integer liveFlag = flagMap.get("liveFlag");
|
|
|
+ Integer replayFlag = flagMap.get("replayFlag");
|
|
|
+ liveMsg.setLiveFlag(liveFlag);
|
|
|
+ liveMsg.setReplayFlag(replayFlag);
|
|
|
+
|
|
|
liveMsgService.insertLiveMsg(liveMsg);
|
|
|
}
|
|
|
|
|
|
@@ -281,6 +443,23 @@ public class WebSocketServer {
|
|
|
liveMsg.setAvatar(msg.getAvatar());
|
|
|
liveMsg.setMsg(msg.getMsg());
|
|
|
liveMsg.setCreateTime(new Date());
|
|
|
+
|
|
|
+ // 根据直播状态设置live_flag或replay_flag
|
|
|
+ Live normalMsgLive = liveService.selectLiveByLiveId(msg.getLiveId());
|
|
|
+ if (normalMsgLive != null && normalMsgLive.getFinishTime() != null) {
|
|
|
+ Date finishTime = java.sql.Timestamp.valueOf(normalMsgLive.getFinishTime());
|
|
|
+ if (new Date().after(finishTime)) {
|
|
|
+ liveMsg.setReplayFlag(1);
|
|
|
+ liveMsg.setLiveFlag(0);
|
|
|
+ } else {
|
|
|
+ liveMsg.setLiveFlag(1);
|
|
|
+ liveMsg.setReplayFlag(0);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ liveMsg.setLiveFlag(1);
|
|
|
+ liveMsg.setReplayFlag(0);
|
|
|
+ }
|
|
|
+
|
|
|
liveMsgService.insertLiveMsg(liveMsg);
|
|
|
msg.setOn(true);
|
|
|
msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
@@ -302,6 +481,25 @@ public class WebSocketServer {
|
|
|
// 广播消息
|
|
|
broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
|
+ case "sendTopMsg":
|
|
|
+ msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
+ if(StringUtils.isEmpty(msg.getMsg())) return;
|
|
|
+ liveMsg = new LiveMsg();
|
|
|
+ liveMsg.setLiveId(msg.getLiveId());
|
|
|
+ liveMsg.setUserId(msg.getUserId());
|
|
|
+ liveMsg.setNickName(msg.getNickName());
|
|
|
+ liveMsg.setAvatar(msg.getAvatar());
|
|
|
+ liveMsg.setMsg(msg.getMsg());
|
|
|
+ liveMsg.setEndTime(DateUtils.addMinutes(new Date(),msg.getDuration()).toString());
|
|
|
+ msg.setOn(true);
|
|
|
+ msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
+ // 广播消息
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ // 放在当前活动里面
|
|
|
+ redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
|
|
|
+ redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
|
|
|
+ redisCache.expire(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), msg.getDuration(), TimeUnit.MINUTES);
|
|
|
+ break;
|
|
|
case "globalVisible":
|
|
|
msg.setOn(true);
|
|
|
liveWatchUserService.updateGlobalVisible(liveId, msg.getStatus());
|
|
|
@@ -322,6 +520,9 @@ public class WebSocketServer {
|
|
|
case "goods":
|
|
|
sendGoodsMessage(msg);
|
|
|
break;
|
|
|
+ case "deleteMsg":
|
|
|
+ deleteMsg(liveId,msg);
|
|
|
+ break;
|
|
|
case "red":
|
|
|
processRed(liveId, msg);
|
|
|
break;
|
|
|
@@ -342,6 +543,15 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void deleteMsg(long liveId,SendMsgVo msg) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserType(0L);
|
|
|
+ sendMsgVo.setCmd("deleteMsg");
|
|
|
+ sendMsgVo.setMsg(msg.getMsg());
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+
|
|
|
private void processCoupon(long liveId, SendMsgVo msg) {
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
@@ -381,7 +591,6 @@ public class WebSocketServer {
|
|
|
* 处理红包变动消息
|
|
|
*/
|
|
|
private void processRed(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("redData: {}", msg);
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
|
@@ -397,7 +606,6 @@ public class WebSocketServer {
|
|
|
* 处理抽奖变动消息
|
|
|
*/
|
|
|
private void processLottery(Long liveId, SendMsgVo msg) {
|
|
|
- log.debug("lotteryData: {}", msg);
|
|
|
JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
Integer status = jsonObject.getInteger("status");
|
|
|
msg.setStatus( status);
|
|
|
@@ -412,7 +620,12 @@ public class WebSocketServer {
|
|
|
//错误时调用
|
|
|
@OnError
|
|
|
public void onError(Session session, Throwable throwable) {
|
|
|
- log.error("webSocKet连接错误 msg: {}", throwable.getMessage(), throwable);
|
|
|
+
|
|
|
+ try {
|
|
|
+ this.onClose(session);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("webSocket 错误处理失败", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -433,12 +646,36 @@ public class WebSocketServer {
|
|
|
return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
}
|
|
|
|
|
|
- //发送消息
|
|
|
+ //发送消息(带锁机制,避免并发发送)
|
|
|
public void sendMessage(Session session, String message) throws IOException {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取Session锁
|
|
|
+ Lock lock = sessionLocks.get(session.getId());
|
|
|
+ if (lock == null) {
|
|
|
+ // 如果锁不存在,创建一个新锁
|
|
|
+ lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用锁保证同一Session的消息串行发送
|
|
|
+ lock.lock();
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.getAsyncRemote().sendText(message);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -446,13 +683,36 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("Integral");
|
|
|
sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
|
sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
+
|
|
|
+ if(Objects.isNull( session)) return;
|
|
|
+ // 使用带锁的sendMessage方法,保证线程安全
|
|
|
+ try {
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("发送积分消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送完课积分弹窗通知给特定用户
|
|
|
+ */
|
|
|
+ public void sendCompletionPointsMessage(Long liveId, Long userId, SendMsgVo sendMsgVo) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
Session session = room.get(userId);
|
|
|
- if(Objects.isNull( session)) return;
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
}
|
|
|
|
|
|
private void sendBlockMessage(Long liveId, Long userId) {
|
|
|
+
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setUserId(userId);
|
|
|
@@ -460,31 +720,79 @@ public class WebSocketServer {
|
|
|
sendMsgVo.setCmd("blockUser");
|
|
|
sendMsgVo.setMsg("账号已被停用");
|
|
|
sendMsgVo.setData(null);
|
|
|
- ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
- Session session = room.get(userId);
|
|
|
+
|
|
|
if(Objects.isNull( session)) return;
|
|
|
- session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ // 使用带锁的sendMessage方法,保证线程安全
|
|
|
+ try {
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("发送封禁消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 广播消息
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param message 消息内容
|
|
|
+ * 优化:使用快照遍历,避免在遍历过程中修改集合
|
|
|
+ */
|
|
|
+ public void broadcastWebMessage(Long liveId, String message) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 普通用户房间:并行发送(使用快照遍历,避免并发修改)
|
|
|
+ // ConcurrentHashMap 的 entrySet() 是弱一致性的,但为了更安全,我们显式创建快照
|
|
|
+ for (Map.Entry<Long, Session> entry : room.entrySet()) {
|
|
|
+ Session session = entry.getValue();
|
|
|
+ if (session != null && session.isOpen()) {
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 广播消息
|
|
|
* @param liveId 直播间ID
|
|
|
* @param message 消息内容
|
|
|
+ * 优化:使用快照遍历,避免在遍历过程中修改集合
|
|
|
*/
|
|
|
public void broadcastMessage(Long liveId, String message) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
|
|
|
- room.forEach((k, v) -> {
|
|
|
- if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
+ // 普通用户房间:并行发送(使用快照遍历,避免并发修改)
|
|
|
+ if (!room.isEmpty()) {
|
|
|
+ for (Map.Entry<Long, Session> entry : room.entrySet()) {
|
|
|
+ Session session = entry.getValue();
|
|
|
+ if (session != null && session.isOpen()) {
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
- adminRoom.forEach(v -> {
|
|
|
- if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
+ }
|
|
|
+
|
|
|
+ // admin房间:串行发送,使用单线程执行器
|
|
|
+ if (!adminRoom.isEmpty()) {
|
|
|
+ ExecutorService executor = adminExecutors.get(liveId);
|
|
|
+ if (executor != null && !executor.isShutdown()) {
|
|
|
+ executor.submit(() -> {
|
|
|
+ for (Session session : adminRoom) {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ // 如果执行器不存在或已关闭,直接发送
|
|
|
+ adminRoom.forEach(v -> {
|
|
|
+ if (v.isOpen()) {
|
|
|
+ sendWithRetry(v, message, 1);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void removeLikeCountCache(Long liveId) {
|
|
|
@@ -504,7 +812,6 @@ public class WebSocketServer {
|
|
|
String valueStr = cacheObject.toString().trim();
|
|
|
current = Integer.parseInt(valueStr);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- log.error("点赞数格式错误,liveId: {}, value: {}", liveId, cacheObject, e);
|
|
|
continue;
|
|
|
}
|
|
|
Integer last = lastLikeCountCache.getOrDefault(liveId, 0);
|
|
|
@@ -520,27 +827,154 @@ public class WebSocketServer {
|
|
|
lastLikeCountCache.keySet().removeIf(liveId -> !activeLiveIds.contains(liveId));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ @Scheduled(fixedRate = 2000)// 每2秒执行一次
|
|
|
+ public void broadcastUserNumMessage() {
|
|
|
+ // 遍历每个直播间
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> entry : rooms.entrySet()) {
|
|
|
+ Long liveId = entry.getKey();
|
|
|
+ ConcurrentHashMap<Long, Session> room = entry.getValue();
|
|
|
+
|
|
|
+ // 统计当前直播间的在线人数
|
|
|
+ int onlineCount = room.size();
|
|
|
+
|
|
|
+ // 构造消息
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setCmd("userCount");
|
|
|
+ sendMsgVo.setData(String.valueOf(onlineCount));
|
|
|
+
|
|
|
+ // 广播当前直播间的在线人数
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时清理无效会话(每分钟执行一次)
|
|
|
+ * 检查心跳超时的会话并关闭
|
|
|
+ * 优化:使用快照遍历,避免在遍历过程中修改集合
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
+ public void cleanInactiveSessions() {
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ int cleanedCount = 0;
|
|
|
+
|
|
|
+ // 遍历所有直播间(使用快照,避免在遍历过程中被修改影响)
|
|
|
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
|
|
|
+ Long liveId = roomEntry.getKey();
|
|
|
+ ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
|
|
|
+
|
|
|
+ // 如果房间为空,跳过
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查普通用户会话(使用快照遍历,避免并发修改异常)
|
|
|
+ List<Long> toRemove = new ArrayList<>();
|
|
|
+ // 创建快照,避免在遍历过程中修改原集合
|
|
|
+ for (Map.Entry<Long, Session> userEntry : room.entrySet()) {
|
|
|
+ Long userId = userEntry.getKey();
|
|
|
+ Session session = userEntry.getValue();
|
|
|
+
|
|
|
+ if (session == null) {
|
|
|
+ toRemove.add(userId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
+ toRemove.add(userId);
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
+ }
|
|
|
+ liveWatchUserService.close(null, liveId, userId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
|
|
|
+ session.getId(), liveId, userId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除超时的会话
|
|
|
+ if (!toRemove.isEmpty()) {
|
|
|
+ String hashKey = String.format(LiveKeysConstant.LIVE_WATCH_USERS, liveId);
|
|
|
+ for (Long userId : toRemove) {
|
|
|
+ room.remove(userId);
|
|
|
+ // 从 Redis hash 中删除无效用户
|
|
|
+ redisCache.hashDelete(hashKey, String.valueOf(userId));
|
|
|
+ }
|
|
|
+ cleanedCount += toRemove.size();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查admin房间
|
|
|
+ for (Map.Entry<Long, CopyOnWriteArrayList<Session>> adminEntry : adminRooms.entrySet()) {
|
|
|
+ Long liveId = adminEntry.getKey();
|
|
|
+ CopyOnWriteArrayList<Session> adminRoom = adminEntry.getValue();
|
|
|
+
|
|
|
+ List<Session> toRemoveAdmin = new ArrayList<>();
|
|
|
+ for (Session session : adminRoom) {
|
|
|
+ Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
+ if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
+ toRemoveAdmin.add(session);
|
|
|
+ try {
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("关闭admin超时会话失败: sessionId={}, liveId={}",
|
|
|
+ session.getId(), liveId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 移除超时的admin会话
|
|
|
+ toRemoveAdmin.forEach(adminRoom::remove);
|
|
|
+ cleanedCount += toRemoveAdmin.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cleanedCount > 0) {
|
|
|
+ if (random.nextInt(10) == 1) {
|
|
|
+ log.info("已清理 {} 个无效会话", cleanedCount);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* 广播点赞消息
|
|
|
* @param liveId 直播间ID
|
|
|
* @param message 消息内容
|
|
|
+ * 优化:使用快照遍历,避免在遍历过程中修改集合
|
|
|
*/
|
|
|
public void broadcastLikeMessage(Long liveId, String message) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
- room.forEach((k, v) -> {
|
|
|
- if (v.isOpen()) {
|
|
|
- sendWithRetry(v,message,7);
|
|
|
+
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用快照遍历,避免并发修改
|
|
|
+ for (Map.Entry<Long, Session> entry : room.entrySet()) {
|
|
|
+ Session session = entry.getValue();
|
|
|
+ if (session != null && session.isOpen()) {
|
|
|
+ sendWithRetry(session, message, 1);
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void sendWithRetry(Session session, String message, int maxRetries) {
|
|
|
+ if (session == null || !session.isOpen()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
int attempts = 0;
|
|
|
while (attempts < maxRetries) {
|
|
|
try {
|
|
|
- if(session.isOpen()) {
|
|
|
- session.getAsyncRemote().sendText(message);
|
|
|
- }
|
|
|
+ // 使用带锁的sendMessage方法,避免并发发送
|
|
|
+ sendMessage(session, message);
|
|
|
return; // 发送成功,退出
|
|
|
} catch (Exception e) {
|
|
|
if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
|
|
|
@@ -552,11 +986,15 @@ public class WebSocketServer {
|
|
|
break;
|
|
|
}
|
|
|
} else {
|
|
|
- throw e;
|
|
|
+ log.error("发送消息失败: sessionId={}, error={}", session.getId(), e.getMessage(), e);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- log.info("超过重试次数, 消息 {}",message);
|
|
|
+
|
|
|
+ if (attempts >= maxRetries) {
|
|
|
+ log.warn("超过重试次数({}),放弃发送消息: sessionId={}", maxRetries, session.getId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -618,6 +1056,9 @@ public class WebSocketServer {
|
|
|
}
|
|
|
LiveCouponIssue liveCouponIssue = liveCouponIssueService.selectLiveCouponIssueByCouponId(liveCoupon.getCouponId());
|
|
|
LiveCouponIssueRelation relation = liveCouponMapper.selectCouponRelation(task.getLiveId(), liveCouponIssue.getId());
|
|
|
+ if (liveCoupon != null) {
|
|
|
+ redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , liveCouponIssue.getId()), liveCouponIssue.getRemainCount().intValue(), 30, TimeUnit.MINUTES);
|
|
|
+ }
|
|
|
HashMap<String, Object> data = new HashMap<>();
|
|
|
data.put("liveId", task.getLiveId());
|
|
|
data.put("couponIssueId", liveCouponIssue.getId());
|
|
|
@@ -629,6 +1070,27 @@ public class WebSocketServer {
|
|
|
data.put("couponTime", liveCoupon.getCouponTime());
|
|
|
msg.setData(JSON.toJSONString(data));
|
|
|
liveCouponMapper.updateChangeShow(task.getLiveId(), liveCouponIssue.getId());
|
|
|
+ } else if (task.getTaskType() == 6L) {
|
|
|
+ // 上架/下架商品
|
|
|
+ msg.setCmd("goods");
|
|
|
+ JSONObject jsonObject = JSON.parseObject(task.getContent());
|
|
|
+ Long goodsId = jsonObject.getLong("goodsId");
|
|
|
+ Integer status = jsonObject.getInteger("status");
|
|
|
+ if (goodsId == null || status == null) {
|
|
|
+ log.error("商品ID或状态为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 更新商品上下架状态
|
|
|
+ liveGoodsService.updateLiveGoodsStatus(goodsId, status);
|
|
|
+ return ;
|
|
|
+ // 更新直播间配置缓存
|
|
|
+// liveService.asyncToCacheLiveConfig(task.getLiveId());
|
|
|
+ // 查询商品信息并广播
|
|
|
+// LiveGoodsVo liveGoodsVo = liveGoodsService.selectLiveGoodsVoByGoodsId(goodsId);
|
|
|
+// if (liveGoodsVo != null) {
|
|
|
+// msg.setData(JSON.toJSONString(liveGoodsVo));
|
|
|
+// msg.setStatus(status);
|
|
|
+// }
|
|
|
}
|
|
|
msg.setStatus(1);
|
|
|
broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
@@ -641,4 +1103,290 @@ public class WebSocketServer {
|
|
|
String key = "live:auto_task:";
|
|
|
redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 计算并更新用户在线时长
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param userId 用户ID
|
|
|
+ * @param companyId 公司ID
|
|
|
+ * @param companyUserId 销售ID
|
|
|
+ */
|
|
|
+ private void updateUserOnlineDuration(Long liveId, Long userId, Long companyId, Long companyUserId) {
|
|
|
+ try {
|
|
|
+ // 从 Redis 获取用户进入时间
|
|
|
+ String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
+ Long entryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
+
|
|
|
+ if (entryTime == null) {
|
|
|
+ // 如果没有进入时间记录,可能是旧数据,跳过
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long currentTimeMillis = System.currentTimeMillis();
|
|
|
+ Date now = new Date();
|
|
|
+
|
|
|
+ // 计算在线时长(秒)
|
|
|
+ long durationSeconds = (currentTimeMillis - entryTime) / 1000;
|
|
|
+
|
|
|
+ if (durationSeconds <= 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取当前直播/回放状态
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
+ Integer currentLiveFlag = flagMap.get("liveFlag");
|
|
|
+ Integer currentReplayFlag = flagMap.get("replayFlag");
|
|
|
+
|
|
|
+ // 查询用户记录
|
|
|
+ LiveWatchUserEntry liveWatchUser = liveWatchUserService.selectLiveWatchAndCompanyUserByFlag(
|
|
|
+ liveId, userId, currentLiveFlag, currentReplayFlag);
|
|
|
+
|
|
|
+ if (liveWatchUser != null) {
|
|
|
+ // 累加在线时长
|
|
|
+ Long onlineSeconds = liveWatchUser.getOnlineSeconds();
|
|
|
+ if (onlineSeconds == null) {
|
|
|
+ onlineSeconds = 0L;
|
|
|
+ }
|
|
|
+ liveWatchUser.setOnlineSeconds(onlineSeconds + durationSeconds);
|
|
|
+ liveWatchUser.setUpdateTime(now);
|
|
|
+
|
|
|
+ // 更新数据库
|
|
|
+ liveWatchUserService.updateLiveWatchUserEntry(liveWatchUser);
|
|
|
+ // 如果 LiveWatchUserEntry 存在,并且当前是直播状态(liveFlag = 1),更新 LiveWatchLog
|
|
|
+// if (currentLiveFlag != null && currentLiveFlag == 1
|
|
|
+// && liveWatchUser.getCompanyId() != null && liveWatchUser.getCompanyId() > 0
|
|
|
+// && liveWatchUser.getCompanyUserId() != null && liveWatchUser.getCompanyUserId() > 0) {
|
|
|
+// updateLiveWatchLogTypeByDuration(liveId, userId,
|
|
|
+// liveWatchUser.getCompanyId(), liveWatchUser.getCompanyUserId(),
|
|
|
+// liveWatchUser.getOnlineSeconds());
|
|
|
+// }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 删除 Redis 中的进入时间记录
|
|
|
+ redisCache.deleteObject(entryTimeKey);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("更新用户在线时长异常:liveId={}, userId={}, error={}",
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 在连接时更新 LiveWatchLog 的 logType
|
|
|
+ * 如果 logType 类型不是 2,修改 logType 类型为 1(看课中)
|
|
|
+ */
|
|
|
+ private void updateLiveWatchLogTypeOnConnect(Long liveId, Long userId, Long qwUserId, Long externalContactId) {
|
|
|
+ try {
|
|
|
+ LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
+ queryLog.setLiveId(liveId);
|
|
|
+ queryLog.setUserId(userId);
|
|
|
+ queryLog.setQwUserId(String.valueOf(qwUserId));
|
|
|
+ queryLog.setExternalContactId(externalContactId);
|
|
|
+
|
|
|
+ List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
|
|
|
+ if (logs != null && !logs.isEmpty()) {
|
|
|
+ for (LiveWatchLog log : logs) {
|
|
|
+ // 如果 logType 不是 2(完课),则更新为 1(看课中)
|
|
|
+ if (log.getLogType() == null || log.getLogType() != 2) {
|
|
|
+ log.setLogType(1);
|
|
|
+ liveWatchLogService.updateLiveWatchLog(log);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("更新 LiveWatchLog logType 异常(连接时):liveId={}, userId={}, error={}",
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 实时更新用户看课状态(在心跳时调用)
|
|
|
+ * 在直播期间实时更新用户的看课状态,而不是等到关闭 WebSocket 或清理无效会话时才更新
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param userId 用户ID
|
|
|
+ * @param watchDuration 观看时长(秒)
|
|
|
+ */
|
|
|
+ public void updateWatchLogTypeInRealTime(Long liveId, Long userId, Long watchDuration) {
|
|
|
+ try {
|
|
|
+ // 获取当前直播/回放状态
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
+ Integer currentLiveFlag = flagMap.get("liveFlag");
|
|
|
+
|
|
|
+ // 只在直播状态(liveFlag = 1)时更新
|
|
|
+ if (currentLiveFlag == null || currentLiveFlag != 1) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取用户的 companyId 和 companyUserId(使用带缓存的查询方法)
|
|
|
+ LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserIdWithCache(liveId, userId);
|
|
|
+ if (liveUserFirstEntry == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long companyId = liveUserFirstEntry.getCompanyId();
|
|
|
+ Long companyUserId = liveUserFirstEntry.getCompanyUserId();
|
|
|
+
|
|
|
+ // 如果 companyId 和 companyUserId 有效,则更新看课状态
|
|
|
+ if (companyId != null && companyId > 0 && companyUserId != null && companyUserId > 0) {
|
|
|
+ // 检查是否达到关键观看时长节点,在这些节点实时更新
|
|
|
+ // 关键节点:3分钟(180秒)、20分钟(1200秒)、30分钟(1800秒)
|
|
|
+ boolean isKeyDuration = (watchDuration == 180 || watchDuration == 1200 || watchDuration == 1800) ||
|
|
|
+ (watchDuration > 180 && watchDuration % 60 == 0); // 每分钟更新一次
|
|
|
+
|
|
|
+ // 使用 Redis 缓存控制更新频率,避免频繁更新数据库
|
|
|
+ // 策略:在关键节点立即更新,其他时候每60秒更新一次
|
|
|
+ String updateLockKey = "live:watch:log:update:lock:" + liveId + ":" + userId;
|
|
|
+ String lastUpdateKey = "live:watch:log:last:duration:" + liveId + ":" + userId;
|
|
|
+
|
|
|
+ // 获取上次更新的时长
|
|
|
+ Long lastUpdateDuration = redisCache.getCacheObject(lastUpdateKey);
|
|
|
+
|
|
|
+ // 如果达到关键节点,或者距离上次更新已超过60秒,则更新
|
|
|
+ boolean shouldUpdate = false;
|
|
|
+ if (isKeyDuration) {
|
|
|
+ // 关键节点立即更新
|
|
|
+ shouldUpdate = true;
|
|
|
+ } else if (lastUpdateDuration == null || (watchDuration - lastUpdateDuration) >= 60) {
|
|
|
+ // 每60秒更新一次
|
|
|
+ shouldUpdate = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (shouldUpdate) {
|
|
|
+ // 使用分布式锁,避免并发更新(锁超时时间10秒)
|
|
|
+ Boolean canUpdate = redisCache.setIfAbsent(updateLockKey, "1", 10, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ if (Boolean.TRUE.equals(canUpdate)) {
|
|
|
+ // 异步更新,避免阻塞心跳处理
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ updateLiveWatchLogTypeByDuration(liveId, userId, companyId, companyUserId, watchDuration);
|
|
|
+ // 更新上次更新的时长
|
|
|
+ redisCache.setCacheObject(lastUpdateKey, watchDuration, 2, TimeUnit.HOURS);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
+ } finally {
|
|
|
+ // 释放锁
|
|
|
+ redisCache.deleteObject(updateLockKey);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据在线时长更新 LiveWatchLog 的 logType
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param userId 用户ID
|
|
|
+ * @param companyId 公司ID
|
|
|
+ * @param companyUserId 销售ID
|
|
|
+ * @param onlineSeconds 在线时长(秒)
|
|
|
+ */
|
|
|
+ private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long companyId,
|
|
|
+ Long companyUserId, Long onlineSeconds) {
|
|
|
+ try {
|
|
|
+ // 获取直播视频总时长(videoType = 1 的视频,使用带缓存的查询方法)
|
|
|
+ List<LiveVideo> videos = liveVideoService.listByLiveIdWithCache(liveId, 1);
|
|
|
+ long totalVideoDuration = 0L;
|
|
|
+ if (videos != null && !videos.isEmpty()) {
|
|
|
+ totalVideoDuration = videos.stream()
|
|
|
+ .filter(v -> v.getDuration() != null)
|
|
|
+ .mapToLong(LiveVideo::getDuration)
|
|
|
+ .sum();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询 LiveWatchLog
|
|
|
+ LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
+ queryLog.setLiveId(liveId);
|
|
|
+ queryLog.setUserId(userId);
|
|
|
+ queryLog.setCompanyId(companyId);
|
|
|
+ queryLog.setCompanyUserId(companyUserId);
|
|
|
+
|
|
|
+ List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
|
|
|
+ if (logs == null || logs.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Date now = DateUtil.getDate();
|
|
|
+ for (LiveWatchLog log : logs) {
|
|
|
+ boolean needUpdate = false;
|
|
|
+ Integer newLogType = log.getLogType();
|
|
|
+
|
|
|
+ // ① 如果在线时长 <= 3分钟,修改 logType 为 4(看课中断)
|
|
|
+ if (onlineSeconds <= 180) { // 3分钟 = 180秒
|
|
|
+ newLogType = 4;
|
|
|
+ needUpdate = true;
|
|
|
+ }
|
|
|
+ // ③ 如果直播视频 >= 40分钟,在线时长 >= 30分钟,logType 设置为 2(完课)
|
|
|
+ else if (totalVideoDuration >= 2400 && onlineSeconds >= 1800) { // 40分钟 = 2400秒,30分钟 = 1800秒
|
|
|
+ newLogType = 2;
|
|
|
+ log.setFinishTime(now);
|
|
|
+ needUpdate = true;
|
|
|
+ }
|
|
|
+ // 如果直播视频 >= 20分钟且 < 40分钟,在线时长 >= 20分钟,logType 设置为 2(完课)
|
|
|
+ else if (totalVideoDuration >= 1200 && totalVideoDuration < 2400 && onlineSeconds >= 1200) { // 20分钟 = 1200秒
|
|
|
+ newLogType = 2;
|
|
|
+ log.setFinishTime(now);
|
|
|
+ needUpdate = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果 logType 已经是 2(完课),不再更新
|
|
|
+ if (needUpdate && (log.getLogType() == null || log.getLogType() != 2)) {
|
|
|
+ log.setLogType(newLogType);
|
|
|
+ liveWatchLogService.updateLiveWatchLog(log);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}",
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 实时检查并推送完课积分
|
|
|
+ * 在用户观看时长更新时立即检查是否达到完课条件,达到则立即推送
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param userId 用户ID
|
|
|
+ * @param duration 当前观看时长(秒)
|
|
|
+ */
|
|
|
+ private void checkAndSendCompletionPointsInRealTime(long liveId, long userId, Long duration) {
|
|
|
+ try {
|
|
|
+ log.debug("[实时完课检查] liveId={}, userId={}, duration={}秒", liveId, userId, duration);
|
|
|
+
|
|
|
+ // 1. 调用完课记录服务检查并创建完课记录
|
|
|
+ completionPointsRecordService.checkAndCreateCompletionRecord(liveId, userId, duration);
|
|
|
+
|
|
|
+ // 2. 查询是否有新的未领取完课记录
|
|
|
+ List<LiveCompletionPointsRecord> unreceivedRecords =
|
|
|
+ completionPointsRecordService.getUserUnreceivedRecords(liveId, userId);
|
|
|
+
|
|
|
+ if (unreceivedRecords == null || unreceivedRecords.isEmpty()) {
|
|
|
+ // 没有待领取的完课记录
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. 构建推送消息
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setCmd("completionPoints");
|
|
|
+ sendMsgVo.setMsg("完成任务!");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(unreceivedRecords.get(0)));
|
|
|
+
|
|
|
+ // 4. 实时推送完课积分弹窗
|
|
|
+ sendCompletionPointsMessage(liveId, userId, sendMsgVo);
|
|
|
+
|
|
|
+ log.info("[实时完课推送] 发送完课积分弹窗通知, liveId={}, userId={}, points={}, duration={}秒",
|
|
|
+ liveId, userId, unreceivedRecords.get(0).getPointsAwarded(), duration);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[实时完课推送] 实时检查完课积分失败, liveId={}, userId={}, duration={}",
|
|
|
+ liveId, userId, duration, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
+
|