|
|
@@ -83,7 +83,7 @@ public class WebSocketServer {
|
|
|
private final ILiveVideoService liveVideoService = SpringUtils.getBean(ILiveVideoService.class);
|
|
|
private final ILiveCompletionPointsRecordService completionPointsRecordService = SpringUtils.getBean(ILiveCompletionPointsRecordService.class);
|
|
|
private static Random random = new Random();
|
|
|
-
|
|
|
+
|
|
|
// Redis key 前缀:用户进入直播间时间
|
|
|
private static final String USER_ENTRY_TIME_KEY = "live:user:entry:time:%s:%s"; // liveId:userId
|
|
|
|
|
|
@@ -135,7 +135,7 @@ public class WebSocketServer {
|
|
|
|
|
|
LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
|
|
|
room.put(userId, session);
|
|
|
-
|
|
|
+
|
|
|
// 存储用户进入直播间的时间到 Redis(用于计算在线时长)
|
|
|
// 如果已经存在进入时间,说明是重连,不应该覆盖,保持原来的进入时间
|
|
|
String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
@@ -145,7 +145,7 @@ public class WebSocketServer {
|
|
|
redisCache.setCacheObject(entryTimeKey, System.currentTimeMillis(), 24, TimeUnit.HOURS);
|
|
|
}
|
|
|
// 如果是重连,不覆盖进入时间,保持原来的进入时间以便正确计算总时长
|
|
|
-
|
|
|
+
|
|
|
// 直播间浏览量 +1
|
|
|
redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
|
|
|
@@ -353,7 +353,7 @@ public class WebSocketServer {
|
|
|
long watchUserId = (long) userProperties.get("userId");
|
|
|
|
|
|
|
|
|
-
|
|
|
+
|
|
|
if (msg.getData() != null && !msg.getData().isEmpty()) {
|
|
|
try {
|
|
|
Long currentDuration = Long.parseLong(msg.getData());
|
|
|
@@ -371,18 +371,21 @@ public class WebSocketServer {
|
|
|
isLiveStarted = true;
|
|
|
} else if (currentLive.getStartTime() != null) {
|
|
|
// 判断当前时间是否已超过开播时间
|
|
|
- LocalDateTime now = java.time.LocalDateTime.now();
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
isLiveStarted = now.isAfter(currentLive.getStartTime()) || now.isEqual(currentLive.getStartTime());
|
|
|
}
|
|
|
-
|
|
|
- if (!isLiveStarted) {
|
|
|
- break;
|
|
|
- }
|
|
|
|
|
|
-
|
|
|
// 使用Hash结构存储:一个直播间一个Hash,包含所有用户的时长
|
|
|
String hashKey = "live:watch:duration:hash:" + liveId;
|
|
|
String userIdField = String.valueOf(watchUserId);
|
|
|
+
|
|
|
+ if (!isLiveStarted) {
|
|
|
+ redisCache.hashDelete(hashKey, userIdField);
|
|
|
+ log.debug("[心跳-观看时长] 直播未开始,清除预播时长, liveId={}, userId={}", liveId, watchUserId);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 直播已开始,记录观看时长
|
|
|
// 获取现有时长
|
|
|
Object existingDuration = redisCache.hashGet(hashKey, userIdField);
|
|
|
// 只有当新的时长更大时才更新
|
|
|
@@ -396,11 +399,11 @@ public class WebSocketServer {
|
|
|
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("[心跳-观看时长] 更新失败, liveId={}, userId={}, data={}",
|
|
|
+ log.error("[心跳-观看时长] 更新失败, liveId={}, userId={}, data={}",
|
|
|
liveId, watchUserId, msg.getData(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
break;
|
|
|
case "sendMsg":
|
|
|
@@ -741,7 +744,7 @@ public class WebSocketServer {
|
|
|
*/
|
|
|
public void broadcastWebMessage(Long liveId, String message) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
-
|
|
|
+
|
|
|
if (room.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
@@ -867,7 +870,7 @@ public class WebSocketServer {
|
|
|
for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
|
|
|
Long liveId = roomEntry.getKey();
|
|
|
ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
|
|
|
-
|
|
|
+
|
|
|
// 如果房间为空,跳过
|
|
|
if (room.isEmpty()) {
|
|
|
continue;
|
|
|
@@ -879,12 +882,12 @@ public class WebSocketServer {
|
|
|
for (Map.Entry<Long, Session> userEntry : room.entrySet()) {
|
|
|
Long userId = userEntry.getKey();
|
|
|
Session session = userEntry.getValue();
|
|
|
-
|
|
|
+
|
|
|
if (session == null) {
|
|
|
toRemove.add(userId);
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Long lastHeartbeat = heartbeatCache.get(session.getId());
|
|
|
if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
|
|
|
toRemove.add(userId);
|
|
|
@@ -954,11 +957,11 @@ public class WebSocketServer {
|
|
|
*/
|
|
|
public void broadcastLikeMessage(Long liveId, String message) {
|
|
|
ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
-
|
|
|
+
|
|
|
if (room.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 使用快照遍历,避免并发修改
|
|
|
for (Map.Entry<Long, Session> entry : room.entrySet()) {
|
|
|
Session session = entry.getValue();
|
|
|
@@ -1119,31 +1122,31 @@ public class WebSocketServer {
|
|
|
// 从 Redis 获取用户进入时间
|
|
|
String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
Long entryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
-
|
|
|
+
|
|
|
if (entryTime == null) {
|
|
|
// 如果没有进入时间记录,可能是旧数据,跳过
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
long currentTimeMillis = System.currentTimeMillis();
|
|
|
Date now = new Date();
|
|
|
-
|
|
|
+
|
|
|
// 计算在线时长(秒)
|
|
|
long durationSeconds = (currentTimeMillis - entryTime) / 1000;
|
|
|
-
|
|
|
+
|
|
|
if (durationSeconds <= 0) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 获取当前直播/回放状态
|
|
|
Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
Integer currentLiveFlag = flagMap.get("liveFlag");
|
|
|
Integer currentReplayFlag = flagMap.get("replayFlag");
|
|
|
-
|
|
|
+
|
|
|
// 查询用户记录
|
|
|
LiveWatchUserEntry liveWatchUser = liveWatchUserService.selectLiveWatchAndCompanyUserByFlag(
|
|
|
liveId, userId, currentLiveFlag, currentReplayFlag);
|
|
|
-
|
|
|
+
|
|
|
if (liveWatchUser != null) {
|
|
|
// 累加在线时长
|
|
|
Long onlineSeconds = liveWatchUser.getOnlineSeconds();
|
|
|
@@ -1152,7 +1155,7 @@ public class WebSocketServer {
|
|
|
}
|
|
|
liveWatchUser.setOnlineSeconds(onlineSeconds + durationSeconds);
|
|
|
liveWatchUser.setUpdateTime(now);
|
|
|
-
|
|
|
+
|
|
|
// 更新数据库
|
|
|
liveWatchUserService.updateLiveWatchUserEntry(liveWatchUser);
|
|
|
// 如果 LiveWatchUserEntry 存在,并且当前是直播状态(liveFlag = 1),更新 LiveWatchLog
|
|
|
@@ -1164,15 +1167,15 @@ public class WebSocketServer {
|
|
|
// liveWatchUser.getOnlineSeconds());
|
|
|
// }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 删除 Redis 中的进入时间记录
|
|
|
redisCache.deleteObject(entryTimeKey);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("更新用户在线时长异常:liveId={}, userId={}, error={}",
|
|
|
+ log.error("更新用户在线时长异常:liveId={}, userId={}, error={}",
|
|
|
liveId, userId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 在连接时更新 LiveWatchLog 的 logType
|
|
|
* 如果 logType 类型不是 2,修改 logType 类型为 1(看课中)
|
|
|
@@ -1183,7 +1186,7 @@ public class WebSocketServer {
|
|
|
queryLog.setLiveId(liveId);
|
|
|
queryLog.setQwUserId(String.valueOf(qwUserId));
|
|
|
queryLog.setExternalContactId(externalContactId);
|
|
|
-
|
|
|
+
|
|
|
List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
|
|
|
if (logs != null && !logs.isEmpty()) {
|
|
|
for (LiveWatchLog log : logs) {
|
|
|
@@ -1195,11 +1198,11 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("更新 LiveWatchLog logType 异常(连接时):liveId={}, userId={}, error={}",
|
|
|
+ log.error("更新 LiveWatchLog logType 异常(连接时):liveId={}, userId={}, error={}",
|
|
|
liveId, userId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 实时更新用户看课状态(在心跳时调用)
|
|
|
* 在直播期间实时更新用户的看课状态,而不是等到关闭 WebSocket 或清理无效会话时才更新
|
|
|
@@ -1212,36 +1215,36 @@ public class WebSocketServer {
|
|
|
// 获取当前直播/回放状态
|
|
|
Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
Integer currentLiveFlag = flagMap.get("liveFlag");
|
|
|
-
|
|
|
+
|
|
|
// 只在直播状态(liveFlag = 1)时更新
|
|
|
if (currentLiveFlag == null || currentLiveFlag != 1) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 获取用户的 companyId 和 companyUserId(使用带缓存的查询方法)
|
|
|
LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserIdWithCache(liveId, userId);
|
|
|
if (liveUserFirstEntry == null) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Long companyId = liveUserFirstEntry.getCompanyId();
|
|
|
Long companyUserId = liveUserFirstEntry.getCompanyUserId();
|
|
|
-
|
|
|
+
|
|
|
// 如果 companyId 和 companyUserId 有效,则更新看课状态
|
|
|
if (companyId != null && companyId > 0 && companyUserId != null && companyUserId > 0) {
|
|
|
// 检查是否达到关键观看时长节点,在这些节点实时更新
|
|
|
// 关键节点:3分钟(180秒)、20分钟(1200秒)、30分钟(1800秒)
|
|
|
boolean isKeyDuration = (watchDuration == 180 || watchDuration == 1200 || watchDuration == 1800) ||
|
|
|
(watchDuration > 180 && watchDuration % 60 == 0); // 每分钟更新一次
|
|
|
-
|
|
|
+
|
|
|
// 使用 Redis 缓存控制更新频率,避免频繁更新数据库
|
|
|
// 策略:在关键节点立即更新,其他时候每60秒更新一次
|
|
|
String updateLockKey = "live:watch:log:update:lock:" + liveId + ":" + userId;
|
|
|
String lastUpdateKey = "live:watch:log:last:duration:" + liveId + ":" + userId;
|
|
|
-
|
|
|
+
|
|
|
// 获取上次更新的时长
|
|
|
Long lastUpdateDuration = redisCache.getCacheObject(lastUpdateKey);
|
|
|
-
|
|
|
+
|
|
|
// 如果达到关键节点,或者距离上次更新已超过60秒,则更新
|
|
|
boolean shouldUpdate = false;
|
|
|
if (isKeyDuration) {
|
|
|
@@ -1251,11 +1254,11 @@ public class WebSocketServer {
|
|
|
// 每60秒更新一次
|
|
|
shouldUpdate = true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (shouldUpdate) {
|
|
|
// 使用分布式锁,避免并发更新(锁超时时间10秒)
|
|
|
Boolean canUpdate = redisCache.setIfAbsent(updateLockKey, "1", 10, TimeUnit.SECONDS);
|
|
|
-
|
|
|
+
|
|
|
if (Boolean.TRUE.equals(canUpdate)) {
|
|
|
// 异步更新,避免阻塞心跳处理
|
|
|
CompletableFuture.runAsync(() -> {
|
|
|
@@ -1264,7 +1267,7 @@ public class WebSocketServer {
|
|
|
// 更新上次更新的时长
|
|
|
redisCache.setCacheObject(lastUpdateKey, watchDuration, 2, TimeUnit.HOURS);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
+ log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
liveId, userId, e.getMessage(), e);
|
|
|
} finally {
|
|
|
// 释放锁
|
|
|
@@ -1275,11 +1278,11 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
+ log.error("实时更新看课状态异常:liveId={}, userId={}, error={}",
|
|
|
liveId, userId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 根据在线时长更新 LiveWatchLog 的 logType
|
|
|
* @param liveId 直播间ID
|
|
|
@@ -1288,7 +1291,7 @@ public class WebSocketServer {
|
|
|
* @param companyUserId 销售ID
|
|
|
* @param onlineSeconds 在线时长(秒)
|
|
|
*/
|
|
|
- private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long companyId,
|
|
|
+ private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long companyId,
|
|
|
Long companyUserId, Long onlineSeconds) {
|
|
|
try {
|
|
|
// 获取直播视频总时长(videoType = 1 的视频,使用带缓存的查询方法)
|
|
|
@@ -1300,13 +1303,13 @@ public class WebSocketServer {
|
|
|
.mapToLong(LiveVideo::getDuration)
|
|
|
.sum();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 查询 LiveWatchLog
|
|
|
LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
queryLog.setLiveId(liveId);
|
|
|
queryLog.setCompanyId(companyId);
|
|
|
queryLog.setCompanyUserId(companyUserId);
|
|
|
-
|
|
|
+
|
|
|
List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
|
|
|
if (logs == null || logs.isEmpty()) {
|
|
|
return;
|
|
|
@@ -1315,7 +1318,7 @@ public class WebSocketServer {
|
|
|
for (LiveWatchLog log : logs) {
|
|
|
boolean needUpdate = false;
|
|
|
Integer newLogType = log.getLogType();
|
|
|
-
|
|
|
+
|
|
|
// ① 如果在线时长 <= 3分钟,修改 logType 为 4(看课中断)
|
|
|
if (onlineSeconds <= 180) { // 3分钟 = 180秒
|
|
|
newLogType = 4;
|
|
|
@@ -1333,7 +1336,7 @@ public class WebSocketServer {
|
|
|
log.setFinishTime(now);
|
|
|
needUpdate = true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 如果 logType 已经是 2(完课),不再更新
|
|
|
if (needUpdate && (log.getLogType() == null || log.getLogType() != 2)) {
|
|
|
log.setLogType(newLogType);
|
|
|
@@ -1341,7 +1344,7 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}",
|
|
|
+ log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}",
|
|
|
liveId, userId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|