|
|
@@ -33,6 +33,7 @@ import javax.annotation.PostConstruct;
|
|
|
import java.math.BigDecimal;
|
|
|
import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
@@ -812,6 +813,8 @@ public class Task {
|
|
|
@DistributeLock(key = "scanLiveWatchUserStatus", scene = "task")
|
|
|
public void scanLiveWatchUserStatus() {
|
|
|
try {
|
|
|
+
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
// 查询所有正在直播的直播间
|
|
|
List<Live> activeLives = liveService.selectNoEndLiveList();
|
|
|
if (activeLives == null || activeLives.isEmpty()) {
|
|
|
@@ -864,7 +867,7 @@ public class Task {
|
|
|
if (onlineSeconds == null || onlineSeconds <= 0) {
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 获取用户的 companyId 和 companyUserId
|
|
|
LiveUserFirstEntry liveUserFirstEntry =
|
|
|
liveUserFirstEntryService.selectEntityByLiveIdUserIdWithCache(liveId, userId);
|
|
|
@@ -878,7 +881,10 @@ public class Task {
|
|
|
if (qwUserId == null || qwUserId <= 0 || externalContactId == null || externalContactId <= 0) {
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+ //更新最新用户活跃时间
|
|
|
+ String liveUserWatchLogKey = String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId,externalContactId,qwUserId);
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ redisCache.setCacheObject(liveUserWatchLogKey,formatter.format(now),5,TimeUnit.MINUTES);
|
|
|
// 使用 updateLiveWatchLogTypeByDuration 的逻辑更新观看记录状态
|
|
|
updateLiveWatchLogTypeByDuration(liveId, userId, qwUserId, externalContactId,
|
|
|
onlineSeconds, totalVideoDuration, updateLog);
|
|
|
@@ -972,6 +978,68 @@ public class Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 每分钟扫描一次用户在线状态用于更新用户观看记录值
|
|
|
+ */
|
|
|
+ @Scheduled(cron = "0 0/1 * * * ?")
|
|
|
+ @DistributeLock(key = "updateLiveWatchUserStatus", scene = "task")
|
|
|
+ public void updateLiveWatchUserStatus() {
|
|
|
+ try {
|
|
|
+ Set<String> keys = redisCache.redisTemplate.keys("live:user:watch:log");
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
+ if (keys != null && !keys.isEmpty()) {
|
|
|
+ for (String key : keys) {
|
|
|
+ String[] split = key.split(":");
|
|
|
+ String cacheTime = redisCache.getCacheObject(key);
|
|
|
+ //判断缓存的值是否已经距离现在超过一分钟
|
|
|
+ if (StringUtils.isNotBlank(cacheTime)) {
|
|
|
+ try {
|
|
|
+ LocalDateTime cachedDateTime = LocalDateTime.parse(cacheTime, formatter);
|
|
|
+ // 比较时间,判断是否超过1分钟(60秒)
|
|
|
+ long secondsBetween = java.time.Duration.between(cachedDateTime, now).getSeconds();
|
|
|
+ if (secondsBetween > 60) {
|
|
|
+ // 距离上次记录已超过1分钟,更新状态为看课中断
|
|
|
+ // 查询 LiveWatchLog
|
|
|
+ LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
+ queryLog.setLiveId(Long.valueOf(split[0]));
|
|
|
+ queryLog.setQwUserId(String.valueOf(split[3]));
|
|
|
+ queryLog.setExternalContactId(Long.valueOf(split[2]));
|
|
|
+ List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogByLogIdWithCache(queryLog);
|
|
|
+ if (logs != null && !logs.isEmpty()) {
|
|
|
+ for (LiveWatchLog log : logs) {
|
|
|
+ if (log.getLogType() != null && log.getLogType() == 2) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ log.setLogType(4);
|
|
|
+ updateLog.add(log);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析缓存时间失败: cacheTime={}, error={}", cacheTime, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 批量插入回放用户数据
|
|
|
+ if (!updateLog.isEmpty()) {
|
|
|
+ int batchSize = 500;
|
|
|
+ for (int i = 0; i < updateLog.size(); i += batchSize) {
|
|
|
+ int end = Math.min(i + batchSize, updateLog.size());
|
|
|
+ List<LiveWatchLog> batch = updateLog.subList(i, end);
|
|
|
+ liveWatchLogService.batchUpdateLiveWatchLog(batch);
|
|
|
+ }
|
|
|
+ for (LiveWatchLog liveWatchLog : updateLog) {
|
|
|
+ redisCache.setCacheObject("live:watch:log:cache:" + liveWatchLog.getLogId(), liveWatchLog, 1, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("每分钟扫描一次用户在线状态用于更新用户观看记录值: error={}", ex.getMessage(), ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 批量同步Redis中的观看时长到数据库
|
|
|
* 每2分钟执行一次,减少数据库压力
|