Browse Source

优化顺便记录到数据库,防止并发

xw 3 days ago
parent
commit
3ef68bc128

+ 78 - 27
fs-live-app/src/main/java/com/fs/live/task/LiveCompletionPointsTask.java

@@ -2,8 +2,10 @@ package com.fs.live.task;
 
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.core.redis.RedisCache;
+import com.fs.live.domain.Live;
 import com.fs.live.domain.LiveCompletionPointsRecord;
 import com.fs.live.service.ILiveCompletionPointsRecordService;
+import com.fs.live.service.ILiveService;
 import com.fs.live.websocket.bean.SendMsgVo;
 import com.fs.live.websocket.service.WebSocketServer;
 import lombok.extern.slf4j.Slf4j;
@@ -11,8 +13,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
-import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 直播完课积分定时任务
@@ -30,47 +35,53 @@ public class LiveCompletionPointsTask {
     @Autowired
     private WebSocketServer webSocketServer;
 
+    @Autowired
+    private ILiveService liveService;
+
     /**
      * 定时检查观看时长并创建完课记录
      * 每分钟执行一次
+     * 优化:使用Hash结构 + 防重复推送
      */
     @Scheduled(cron = "0 */1 * * * ?")
     public void checkCompletionStatus() {
         try {
-            // 1. 获取所有观看时长的Redis key
-            Collection<String> keys = redisCache.keys("live:watch:duration:*");
+            List<Live> activeLives = liveService.selectNoEndLiveList();
             
-            if (keys == null || keys.isEmpty()) {
+            if (activeLives == null || activeLives.isEmpty()) {
                 return;
             }
 
-            // 2. 遍历处理每个用户的观看时长
-            for (String key : keys) {
+            for (Live live : activeLives) {
                 try {
-                    String[] parts = key.split(":");
-                    if (parts.length != 5) {
+                    Long liveId = live.getLiveId();
+                    
+                    // 使用Hash结构获取该直播间所有用户的观看时长
+                    String hashKey = "live:watch:duration:hash:" + liveId;
+                    Map<Object, Object> userDurations = redisCache.redisTemplate.opsForHash().entries(hashKey);
+                    
+                    if (userDurations == null || userDurations.isEmpty()) {
                         continue;
                     }
-
-                    Long liveId = Long.parseLong(parts[3]);
-                    Long userId = Long.parseLong(parts[4]);
-
-                    // 3. 获取观看时长(秒)
-                    Object durationObj = redisCache.getCacheObject(key);
-                    if (durationObj == null) {
-                        continue;
+                    
+                    // 3. 逐个用户处理
+                    for (Map.Entry<Object, Object> entry : userDurations.entrySet()) {
+                        try {
+                            Long userId = Long.parseLong(entry.getKey().toString());
+                            
+                            // 4. 检查并创建完课记录(传null,自动累计直播+回放时长)
+                            completionPointsRecordService.checkAndCreateCompletionRecord(liveId, userId, null);
+
+                            // 5. 检查是否有新的完课记录待领取,推送弹窗消息(防重复)
+                            sendCompletionNotificationOnce(liveId, userId);
+
+                        } catch (Exception e) {
+                            log.error("处理用户完课状态失败, liveId={}, userId={}", liveId, entry.getKey(), e);
+                        }
                     }
-
-                    Long watchDuration = Long.parseLong(durationObj.toString());
-
-                    // 4. 检查并创建完课记录
-                    completionPointsRecordService.checkAndCreateCompletionRecord(liveId, userId, watchDuration);
-
-                    // 5. 检查是否有新的完课记录待领取,推送弹窗消息
-                    sendCompletionNotification(liveId, userId);
-
+                    
                 } catch (Exception e) {
-                    log.error("处理观看时长失败, key={}", key, e);
+                    log.error("处理直播间完课状态失败, liveId={}", live.getLiveId(), e);
                 }
             }
 
@@ -80,7 +91,47 @@ public class LiveCompletionPointsTask {
     }
 
     /**
-     * 发送完课通知(通过WebSocket推送弹窗)
+     * 发送完课通知(通过WebSocket推送弹窗) - 防重复版本
+     */
+    private void sendCompletionNotificationOnce(Long liveId, Long userId) {
+        try {
+            // 1. 检查 Redis 是否已推送过(防止每分钟都推送)
+            String notifyKey = "live:completion:notified:" + liveId + ":" + userId;
+            Boolean hasNotified = redisCache.hasKey(notifyKey);
+            
+            if (Boolean.TRUE.equals(hasNotified)) {
+                return;  // 已经推送过,不再重复推送
+            }
+            
+            // 2. 查询未领取的完课记录
+            List<LiveCompletionPointsRecord> unreceivedRecords = 
+                completionPointsRecordService.getUserUnreceivedRecords(liveId, userId);
+            
+            if (unreceivedRecords != null && !unreceivedRecords.isEmpty()) {
+                // 3. 构造弹窗消息
+                SendMsgVo sendMsgVo = new SendMsgVo();
+                sendMsgVo.setLiveId(liveId);
+                sendMsgVo.setUserId(userId);
+                sendMsgVo.setCmd("completionPoints");
+                sendMsgVo.setMsg("完成任务!");
+                sendMsgVo.setData(JSONObject.toJSONString(unreceivedRecords.get(0)));
+
+                // 4. 通过WebSocket发送给特定用户
+                webSocketServer.sendCompletionPointsMessage(liveId, userId, sendMsgVo);
+                
+                // 5. 记录已推送,24小时后过期(第二天可以再次推送)
+                redisCache.setCacheObject(notifyKey, "1", 24, TimeUnit.HOURS);
+                
+                log.info("发送完课积分弹窗通知成功, liveId={}, userId={}, points={}", 
+                        liveId, userId, unreceivedRecords.get(0).getPointsAwarded());
+            }
+        } catch (Exception e) {
+            log.error("发送完课通知失败, liveId={}, userId={}", liveId, userId, e);
+        }
+    }
+
+    /**
+     * 发送完课通知(通过WebSocket推送弹窗) - 旧版本(保留)
      */
     private void sendCompletionNotification(Long liveId, Long userId) {
         try {

+ 71 - 0
fs-live-app/src/main/java/com/fs/live/task/Task.java

@@ -691,4 +691,75 @@ public class Task {
             log.error("扫描直播间打标签任务异常: error={}", e.getMessage(), e);
         }
     }
+
+    /**
+     * 批量同步Redis中的观看时长到数据库
+     * 每2分钟执行一次,减少数据库压力
+     */
+    @Scheduled(cron = "0 0/2 * * * ?")
+    @DistributeLock(key = "batchSyncWatchDuration", scene = "task")
+    public void batchSyncWatchDuration() {
+        try {
+            log.info("开始批量同步观看时长到数据库");
+            
+            // 优化:从所有直播间的Hash中批量获取数据
+            List<Live> activeLives = liveService.selectNoEndLiveList();
+            
+            if (activeLives == null || activeLives.isEmpty()) {
+                log.debug("当前没有活跃的直播间");
+                return;
+            }
+            
+            int totalCount = 0;
+            int successCount = 0;
+            int failCount = 0;
+            
+            // 逐个直播间处理
+            for (Live live : activeLives) {
+                try {
+                    Long liveId = live.getLiveId();
+                    
+                    // 使用Hash结构存储每个直播间的观看时长
+                    String hashKey = "live:watch:duration:hash:" + liveId;
+                    Map<Object, Object> userDurations = redisCache.redisTemplate.opsForHash().entries(hashKey);
+                    
+                    if (userDurations == null || userDurations.isEmpty()) {
+                        continue;
+                    }
+                    
+                    // 获取直播/回放标记(一次查询,所有用户复用)
+                    Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
+                    Integer liveFlag = flagMap.get("liveFlag");
+                    Integer replayFlag = flagMap.get("replayFlag");
+                    
+                    // 批量处理该直播间的所有用户
+                    for (Map.Entry<Object, Object> entry : userDurations.entrySet()) {
+                        try {
+                            Long userId = Long.parseLong(entry.getKey().toString());
+                            Long duration = Long.parseLong(entry.getValue().toString());
+                            
+                            totalCount++;
+                            
+                            // 异步更新数据库
+                            liveWatchUserService.updateWatchDuration(liveId, userId, liveFlag, replayFlag, duration);
+                            successCount++;
+                            
+                        } catch (Exception e) {
+                            failCount++;
+                            log.error("同步用户观看时长失败: liveId={}, userId={}, error={}", 
+                                    liveId, entry.getKey(), e.getMessage());
+                        }
+                    }
+                    
+                } catch (Exception e) {
+                    log.error("处理直播间观看时长失败: liveId={}, error={}", live.getLiveId(), e.getMessage());
+                }
+            }
+            
+            log.info("批量同步观看时长完成: 总数={}, 成功={}, 失败={}", totalCount, successCount, failCount);
+            
+        } catch (Exception e) {
+            log.error("批量同步观看时长任务异常", e);
+        }
+    }
 }

+ 16 - 6
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -58,7 +58,7 @@ public class WebSocketServer {
     // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
     private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
     // 心跳超时时间(毫秒):3分钟无心跳则认为超时
-    private final static long HEARTBEAT_TIMEOUT = 1 * 60 * 1000;
+    private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
     // admin房间消息发送线程池(单线程,保证串行化)
     private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
 
@@ -315,17 +315,27 @@ public class WebSocketServer {
                 case "heartbeat":
                     // 更新心跳时间
                     heartbeatCache.put(session.getId(), System.currentTimeMillis());
-                    
-                    // 心跳时同步更新观看时长
+
+                    // 心跳时同步更新观看时长到Redis Hash
                     long watchUserId = (long) userProperties.get("userId");
-                    String durationKey = "live:watch:duration:" + liveId + ":" + watchUserId;
                     
                     if (msg.getData() != null && !msg.getData().isEmpty()) {
                         try {
                             Long currentDuration = Long.parseLong(msg.getData());
-                            Object existingDuration = redisCache.getCacheObject(durationKey);
+                            
+                            // 使用Hash结构存储:一个直播间一个Hash,包含所有用户的时长
+                            String hashKey = "live:watch:duration:hash:" + liveId;
+                            String userIdField = String.valueOf(watchUserId);
+                            
+                            // 获取现有时长
+                            Object existingDuration = redisCache.redisTemplate.opsForHash().get(hashKey, userIdField);
+                            
+                            // 只有当新的时长更大时才更新(避免时间倒退)
                             if (existingDuration == null || currentDuration > Long.parseLong(existingDuration.toString())) {
-                                redisCache.setCacheObject(durationKey, currentDuration.toString(), 2, TimeUnit.HOURS);
+                                // 更新Hash中的用户时长
+                                redisCache.redisTemplate.opsForHash().put(hashKey, userIdField, currentDuration.toString());
+                                // 设置过期时间(2小时)
+                                redisCache.redisTemplate.expire(hashKey, 2, TimeUnit.HOURS);
                             }
                         } catch (Exception e) {
                             log.error("心跳更新观看时长失败, liveId={}, userId={}", liveId, watchUserId, e);

+ 18 - 0
fs-service/src/main/java/com/fs/live/service/ILiveWatchUserService.java

@@ -139,4 +139,22 @@ public interface ILiveWatchUserService {
      * 根据用户直播看课记录来打标签
      */
     void qwTagMarkByLiveWatchLog(Long liveId);
+
+    /**
+     * 更新用户观看时长(心跳时调用)
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @param liveFlag 直播标记
+     * @param replayFlag 回放标记
+     * @param duration 观看时长(秒)
+     */
+    void updateWatchDuration(Long liveId, Long userId, Integer liveFlag, Integer replayFlag, Long duration);
+
+    /**
+     * 获取用户在某直播间的总观看时长(直播 + 回放)
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @return 总观看时长(秒)
+     */
+    Long getTotalWatchDuration(Long liveId, Long userId);
 }

+ 31 - 10
fs-service/src/main/java/com/fs/live/service/impl/LiveCompletionPointsRecordServiceImpl.java

@@ -12,6 +12,7 @@ import com.fs.live.domain.LiveCompletionPointsRecord;
 import com.fs.live.mapper.LiveCompletionPointsRecordMapper;
 import com.fs.live.service.ILiveCompletionPointsRecordService;
 import com.fs.live.service.ILiveService;
+import com.fs.live.service.ILiveWatchUserService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -44,9 +45,15 @@ public class LiveCompletionPointsRecordServiceImpl implements ILiveCompletionPoi
     @Autowired
     private FsUserIntegralLogsMapper fsUserIntegralLogsMapper;
 
+    @Autowired
+    private ILiveWatchUserService liveWatchUserService;
+
 
     /**
      * 检查并创建完课记录(由定时任务调用)
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @param watchDuration 观看时长(可为null,为null时从数据库自动累计直播+回放时长)
      */
     @Override
     @Transactional(rollbackFor = Exception.class)
@@ -78,26 +85,40 @@ public class LiveCompletionPointsRecordServiceImpl implements ILiveCompletionPoi
                 return;
             }
 
-            // 3. 获取视频总时长(秒)
+            // 3. 获取观看时长(如果为null,则从数据库累计直播+回放时长)
+            Long actualWatchDuration = watchDuration;
+            if (actualWatchDuration == null) {
+                // 自动累加直播和回放的观看时长
+                actualWatchDuration = liveWatchUserService.getTotalWatchDuration(liveId, userId);
+                log.debug("自动累计观看时长: liveId={}, userId={}, totalDuration={}",
+                        liveId, userId, actualWatchDuration);
+            }
+
+            if (actualWatchDuration == null || actualWatchDuration <= 0) {
+                log.debug("观看时长为0, liveId={}, userId={}", liveId, userId);
+                return;
+            }
+
+            // 4. 获取视频总时长(秒)
             Long videoDuration = live.getDuration();
             if (videoDuration == null || videoDuration <= 0) {
                 log.warn("直播间视频时长无效, liveId={}, duration={}", liveId, videoDuration);
                 return;
             }
 
-            // 4. 计算完课比例
-            BigDecimal watchRate = BigDecimal.valueOf(watchDuration)
+            // 5. 计算完课比例
+            BigDecimal watchRate = BigDecimal.valueOf(actualWatchDuration)
                     .multiply(BigDecimal.valueOf(100))
                     .divide(BigDecimal.valueOf(videoDuration), 2, RoundingMode.HALF_UP);
 
-            // 5. 判断是否达到完课标准
+            // 6. 判断是否达到完课标准
             if (watchRate.compareTo(BigDecimal.valueOf(completionRate)) < 0) {
-                log.debug("观看时长未达到完课标准, liveId={}, userId={}, watchRate={}%, required={}%",
-                        liveId, userId, watchRate, completionRate);
+                log.debug("观看时长未达到完课标准, liveId={}, userId={}, watchDuration={}, videoDuration={}, watchRate={}%, required={}%",
+                        liveId, userId, actualWatchDuration, videoDuration, watchRate, completionRate);
                 return;
             }
 
-            // 6. 检查今天是否已有完课记录
+            // 7. 检查今天是否已有完课记录
             LocalDate today = LocalDate.now();
             Date currentDate = Date.from(today.atStartOfDay(ZoneId.systemDefault()).toInstant());
 
@@ -137,7 +158,7 @@ public class LiveCompletionPointsRecordServiceImpl implements ILiveCompletionPoi
             LiveCompletionPointsRecord record = new LiveCompletionPointsRecord();
             record.setLiveId(liveId);
             record.setUserId(userId);
-            record.setWatchDuration(watchDuration);
+            record.setWatchDuration(actualWatchDuration);
             record.setVideoDuration(videoDuration);
             record.setCompletionRate(watchRate);
             record.setContinuousDays(continuousDays);
@@ -151,8 +172,8 @@ public class LiveCompletionPointsRecordServiceImpl implements ILiveCompletionPoi
 
             recordMapper.insertRecord(record);
 
-            log.info("创建完课记录成功, liveId={}, userId={}, continuousDays={}, points={}",
-                    liveId, userId, continuousDays, points);
+            log.info("创建完课记录成功, liveId={}, userId={}, watchDuration={}, videoDuration={}, watchRate={}%, continuousDays={}, points={}",
+                    liveId, userId, actualWatchDuration, videoDuration, watchRate, continuousDays, points);
 
         } catch (Exception e) {
             log.error("检查并创建完课记录失败, liveId={}, userId={}", liveId, userId, e);

+ 69 - 0
fs-service/src/main/java/com/fs/live/service/impl/LiveWatchUserServiceImpl.java

@@ -943,4 +943,73 @@ public class LiveWatchUserServiceImpl implements ILiveWatchUserService {
                 userTagVOS.size(), successCount, failCount);
     }
 
+    /**
+     * 更新用户观看时长(心跳时调用)- 异步执行
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @param liveFlag 直播标记
+     * @param replayFlag 回放标记
+     * @param duration 观看时长(秒)
+     */
+    @Override
+    @Async
+    public void updateWatchDuration(Long liveId, Long userId, Integer liveFlag, Integer replayFlag, Long duration) {
+        try {
+
+            LiveWatchUser liveWatchUser = baseMapper.selectByUniqueIndex(liveId, userId, liveFlag, replayFlag);
+            
+            if (liveWatchUser != null) {
+                if (liveWatchUser.getOnlineSeconds() == null || duration > liveWatchUser.getOnlineSeconds()) {
+                    liveWatchUser.setOnlineSeconds(duration);
+                    liveWatchUser.setUpdateTime(DateUtils.getNowDate());
+                    baseMapper.updateLiveWatchUser(liveWatchUser);
+                    log.debug("更新观看时长成功: liveId={}, userId={}, liveFlag={}, replayFlag={}, duration={}",
+                            liveId, userId, liveFlag, replayFlag, duration);
+                }
+            } else {
+                log.warn("未找到观看记录,无法更新时长: liveId={}, userId={}, liveFlag={}, replayFlag={}",
+                        liveId, userId, liveFlag, replayFlag);
+            }
+        } catch (Exception e) {
+            log.error("更新观看时长失败: liveId={}, userId={}, liveFlag={}, replayFlag={}, duration={}",
+                    liveId, userId, liveFlag, replayFlag, duration, e);
+        }
+    }
+
+    /**
+     * 获取用户在某直播间的总观看时长(直播 + 回放)
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @return 总观看时长(秒)
+     */
+    @Override
+    public Long getTotalWatchDuration(Long liveId, Long userId) {
+        try {
+            long totalDuration = 0L;
+            
+            // 1. 查询直播观看记录(liveFlag=1, replayFlag=0)
+            LiveWatchUser liveRecord = baseMapper.selectByUniqueIndex(liveId, userId, 1, 0);
+            if (liveRecord != null && liveRecord.getOnlineSeconds() != null) {
+                totalDuration += liveRecord.getOnlineSeconds();
+            }
+            
+            // 2. 查询回放观看记录(liveFlag=0, replayFlag=1)
+            LiveWatchUser replayRecord = baseMapper.selectByUniqueIndex(liveId, userId, 0, 1);
+            if (replayRecord != null && replayRecord.getOnlineSeconds() != null) {
+                totalDuration += replayRecord.getOnlineSeconds();
+            }
+            
+            log.debug("查询总观看时长: liveId={}, userId={}, liveDuration={}, replayDuration={}, total={}",
+                    liveId, userId,
+                    liveRecord != null ? liveRecord.getOnlineSeconds() : 0,
+                    replayRecord != null ? replayRecord.getOnlineSeconds() : 0,
+                    totalDuration);
+            
+            return totalDuration;
+        } catch (Exception e) {
+            log.error("查询总观看时长失败: liveId={}, userId={}", liveId, userId, e);
+            return 0L;
+        }
+    }
+
 }