zyp 2 weeks ago
parent
commit
72c52dd85b

+ 5 - 5
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -83,7 +83,7 @@ public class CourseWatchLogScheduler {
 //    }
 
 
-    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+//    @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void checkWatchStatus() {
         // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning1.compareAndSet(false, true)) {
@@ -127,7 +127,7 @@ public class CourseWatchLogScheduler {
     /**
      * 创建完课消息
      */
-    @Scheduled(fixedRate = 300000) // 每五分钟执行一次
+//    @Scheduled(fixedRate = 300000) // 每五分钟执行一次
     public void createCourseFinishMsg() {
         // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning3.compareAndSet(false, true)) {
@@ -157,7 +157,7 @@ public class CourseWatchLogScheduler {
     /**
      * 每天删除过期短链
      */
-    @Scheduled(cron = "0 0 0 * * ?")  // 0点0分0秒执行
+//    @Scheduled(cron = "0 0 0 * * ?")  // 0点0分0秒执行
     public void delCourseExpireLink() {
         try {
             log.info("删除过期短链 - 定时任务开始");
@@ -178,8 +178,8 @@ public class CourseWatchLogScheduler {
         }
         try {
             log.info("WXH5-检查会员看课中任务执行>>>>>>>>>>>>");
-            courseWatchLogService.scheduleUpdateDurationToDatabase();
-            courseWatchLogService.checkFsUserWatchStatus();
+            courseWatchLogService.scheduleBatchUpdateToDatabase();
+            courseWatchLogService.checkWatchStatus();
             log.info("WXH5-检查会员看课中任务执行完成>>>>>>>>>>>>");
         }catch (Exception e) {
             log.error("WXH5-检查会员看课中任务执行完成 - 定时任务执行失败", e);

+ 375 - 157
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -399,6 +399,218 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
         return baseMapper.countByMap(params);
     }
 
+
+    @Autowired
+    private RedisKeyScanner redisKeyScanner;
+    @Override
+    public void scheduleBatchUpdateToDatabase() {
+        try {
+            log.info("开始更新看课时长,检查完课>>>>>>");
+
+            // 读取所有的key
+            Set<String> keys = redisKeyScanner.scanMatchKey("h5wxuser:watch:duration:*");
+            log.info("共扫描到 {} 个待处理键", keys.size());
+
+            // 如果keys为空,直接返回
+            if (CollectionUtils.isEmpty(keys)) {
+                return;
+            }
+
+            // 读取看课配置
+            String json = configService.selectConfigByKey("course.config");
+            CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+
+            // 创建线程安全的集合
+            List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
+
+            // 创建线程池(根据服务器配置调整线程数)
+            int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
+            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
+
+            // 将keys分成多个批次,每个线程处理一批
+            List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
+
+            log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
+
+            int type = 1; //检查完课任务
+
+            // 创建所有任务
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (List<String> batchKeys : keyBatches) {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                    processKeyBatch(batchKeys, config, logs,type);
+                }, executorService);
+                futures.add(future);
+            }
+
+            // 等待所有任务完成
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+            // 关闭线程池
+            executorService.shutdown();
+
+            // 批量更新到数据库
+            batchUpdateFsUserCourseWatchLog(logs, 100);
+
+        } catch (Exception e) {
+            log.error("定时任务执行失败 scheduleBatchUpdateToDatabase", e);
+        }
+    }
+
+    @Override
+    public void checkWatchStatus() {
+        try {
+            log.info("开始更新看课中断记录>>>>>");
+
+            // 读取所有的key
+            Set<String> keys = redisKeyScanner.scanMatchKey("h5wxuser:watch:heartbeat:*");
+            log.info("共扫描到 {} 个待处理键", keys.size());
+
+            // 如果keys为空,直接返回
+            if (CollectionUtils.isEmpty(keys)) {
+                return;
+            }
+
+            // 读取看课配置
+            String json = configService.selectConfigByKey("course.config");
+            CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+
+            // 创建线程安全的集合
+            List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
+
+            // 创建线程池(根据服务器配置调整线程数)
+            int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
+            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
+
+            // 将keys分成多个批次,每个线程处理一批
+            List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
+
+            log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
+
+            int type = 2; //检查看课中断
+
+            // 创建所有任务
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (List<String> batchKeys : keyBatches) {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                    processKeyBatch(batchKeys, config, logs, type);
+                }, executorService);
+                futures.add(future);
+            }
+
+            // 等待所有任务完成
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+            // 关闭线程池
+            executorService.shutdown();
+
+            // 批量更新到数据库
+            batchUpdateFsUserCourseWatchLog(logs, 100);
+
+        } catch (Exception e) {
+            log.error("定时任务执行失败checkWatchStatus", e);
+        }
+    }
+
+    /**
+     * 处理一个key批次
+     */
+    private void processKeyBatch(List<String> batchKeys, CourseConfig config, List<FsCourseWatchLog> logs,int type) {
+        LocalDateTime now = LocalDateTime.now();
+        for (String key : batchKeys) {
+            try {
+                // 取key中数据
+                Long userId = null;
+                Long videoId = null;
+                Long companyUserId = null;
+                try {
+                    String[] parts = key.split(":");
+                    userId = Long.parseLong(parts[3]);
+                    videoId = Long.parseLong(parts[4]);
+                    companyUserId = Long.parseLong(parts[5]);
+                } catch (Exception e) {
+                    log.error("key中id为null:{}", key);
+                    continue;
+                }
+                FsCourseWatchLog watchLog = new FsCourseWatchLog();
+                //检查完课
+                if (type==1){
+                    String durationStr = redisCache.getCacheObject(key);
+                    if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
+                        redisCache.deleteObject(key);
+                        log.error("key中数据为null:{}", key);
+                        continue;  // 如果 Redis 中没有记录,跳过
+                    }
+                    Long duration = Long.valueOf(durationStr);
+
+                    watchLog.setDuration(duration);
+
+                    // 取对应视频的时长
+                    Long videoDuration;
+                    try {
+                        videoDuration = getFsUserVideoDuration(videoId);
+                    } catch (Exception e) {
+                        log.error("视频时长为空:{}", videoId);
+                        continue;
+                    }
+
+                    if (videoDuration != null && videoDuration != 0) {
+                        // 判断是否完课
+                        long percentage = (duration * 100 / videoDuration);
+                        if (percentage >= config.getAnswerRate()) {
+                            watchLog.setLogType(2); // 设置状态为"已完成"
+                            watchLog.setFinishTime(new Date());
+                            String heartbeatKey = "h5wxuser:watch:heartbeat:" + userId+ ":" + videoId + ":" + companyUserId;
+                            // 完课删除心跳记录
+                            redisCache.deleteObject(heartbeatKey);
+                            // 完课删除看课时长记录
+                            redisCache.deleteObject(key);
+                        }
+                    }
+                }else{
+                    //检查看课中断
+                    // 获取最后心跳时间
+                    String lastHeartbeatStr = redisCache.getCacheObject(key);
+                    if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
+                        redisCache.deleteObject(key);
+                        continue; // 如果 Redis 中没有记录,跳过
+                    }
+                    LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+                    Duration duration = Duration.between(lastHeartbeatTime, now);
+                    // 如果超过一分钟没有心跳,标记为“观看中断”
+                    if (duration.getSeconds() >= 60) {
+                        watchLog.setLogType(4);
+                        // 从 Redis 中删除该记录
+                        redisCache.deleteObject(key);
+                    } else {
+                        watchLog.setLogType(1);
+                    }
+                }
+                watchLog.setVideoId(videoId);
+                watchLog.setUserId(userId);
+                watchLog.setCompanyUserId(companyUserId);
+
+
+                // 线程安全地添加到集合
+                logs.add(watchLog);
+
+            } catch (Exception e) {
+                log.error("处理key {} 时发生异常: {}", key, e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * 将keys分成多个批次
+     */
+    private List<List<String>> partitionKeys(List<String> keys, int batchSize) {
+        List<List<String>> batches = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, keys.size());
+            batches.add(keys.subList(i, end));
+        }
+        return batches;
+    }
     @Override
     public void scheduleUpdateDurationToDatabase() {
         log.info("WXH5-开始更新会员看课时长,检查完课>>>>>>");
@@ -625,31 +837,37 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
 
 
     public void batchUpdateFsUserCourseWatchLog(List<FsCourseWatchLog> logs, int batchSize) {
-        if (logs == null || logs.isEmpty()) {
-            log.info("待更新的日志列表为空,无需处理");
-            return;
-        }
+        try {
+            // 记录开始时间
+            long startTime = System.currentTimeMillis();
+            log.info("开始批量更新日志,总日志数量: {}", logs == null ? 0 : logs.size());
 
-        // 记录总日志数量
-        log.info("开始批量更新日志,总日志数量: {}", logs.size());
 
-        // 分批处理
-        for (int i = 0; i < logs.size(); i += batchSize) {
-            int end = Math.min(i + batchSize, logs.size());
-            List<FsCourseWatchLog> batchList = logs.subList(i, end);
+            if (logs == null || logs.isEmpty()) {
+                log.info("待更新的日志列表为空,无需处理");
+                return;
+            }
 
-            // 记录当前批次的数量
-            log.info("正在更新第 {} 批日志,数量: {}", (i / batchSize) + 1, batchList.size());
+            // 分批处理
+            for (int i = 0; i < logs.size(); i += batchSize) {
+                int end = Math.min(i + batchSize, logs.size());
+                List<FsCourseWatchLog> batchList = logs.subList(i, end);
+                // 执行批量更新
+                fsCourseWatchLogMapper.batchUpdateFsUserWatchLog(batchList);
+            }
 
-            // 执行批量更新
-            fsCourseWatchLogMapper.batchUpdateFsUserWatchLog(batchList);
 
-            // 记录当前批次更新完成
-            log.info("第 {} 批日志更新完成,数量: {}", (i / batchSize) + 1, batchList.size());
-        }
+            // 计算总耗时
+            long totalCost = System.currentTimeMillis() - startTime;
+            log.info("所有日志更新完成,总数量: {}, 总耗时: {}ms, 平均速度: {}/s",
+                    logs.size(),
+                    totalCost,
+                    totalCost > 0 ? String.format("%.2f", logs.size() * 1000.0 / totalCost) : "∞");
 
-        // 记录全部更新完成
-        log.info("所有日志更新完成,总日志数量: {}", logs.size());
+        }finally {
+            assert logs != null;
+            logs.clear(); // 提前释放引用 进入GC
+        }
     }
 
     @Override
@@ -835,144 +1053,144 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
     @Autowired
     private ISysConfigService configService;
 
-    @Override
-    public void scheduleBatchUpdateToDatabase() {
-        log.info("开始更新看课时长,检查完课>>>>>>");
-        //读取所有的key
-        Collection<String> keys = redisCache.keys("h5user:watch:duration:*");
-        //读取看课配置
-        String json = configService.selectConfigByKey("course.config");
-        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
-
-        List<FsCourseWatchLog> logs = new ArrayList<>();
-        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
-        for (String key : keys) {
-            //取key中数据
-            Long qwUserId=null;
-            Long videoId=null;
-            Long externalId=null;
-            try {
-                String[] parts = key.split(":");
-                qwUserId = Long.parseLong(parts[3]);
-                externalId = Long.parseLong(parts[4]);
-                videoId = Long.parseLong(parts[5]);
-            }catch (Exception e){
-                log.error("key中id为null:{}", key);
-                continue;
-            }
-            String durationStr = redisCache.getCacheObject(key);
-            if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
-                redisCache.deleteObject(key);
-                log.error("key中数据为null:{}", key);
-                continue;  // 如果 Redis 中没有记录,跳过
-            }
-            Long duration = Long.valueOf(durationStr);
-
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            watchLog.setVideoId(videoId);
-            watchLog.setQwUserId(qwUserId);
-            watchLog.setQwExternalContactId(externalId);
-            watchLog.setDuration(duration);
-
-            //取对应视频的时长
-            Long videoDuration = 0L;
-            try {
-                videoDuration = getVideoDuration(videoId);
-            }catch (Exception e){
-                log.error("视频时长识别错误:{}", key);
-                continue;
-            }
-
-            if (videoDuration != null && videoDuration != 0) {
-                boolean complete = false;
-                // 判断百分比
-                if(config.getCompletionMode() == 1 && config.getAnswerRate() != null){
-                    long percentage = (duration * 100 / videoDuration);
-                    complete = percentage >= config.getAnswerRate();
-                }
-                // 判断分钟数
-                if(config.getCompletionMode() == 2 && config.getMinutesNum() != null){
-                    int i = config.getMinutesNum() * 60;
-                    complete = videoDuration > i;
-                }
-                //判断是否完课
-                if (complete) {
-                    watchLog.setLogType(2); // 设置状态为“已完成”
-                    watchLog.setFinishTime(new Date());
-                    String heartbeatKey ="h5user:watch:heartbeat:" + qwUserId+ ":" + externalId + ":" + videoId;
-                    // 完课删除心跳记录
-                    redisCache.deleteObject(heartbeatKey);
-                    // 完课删除看课时长记录
-                    redisCache.deleteObject(key);
-
-                    finishedLogs.add(watchLog);
-                }
-            }
-            //集合中增加
-            logs.add(watchLog);
-        }
-
-        batchUpdateFsCourseWatchLog(logs,100);
-
-        // 完课打标签
-        if(CollectionUtils.isNotEmpty(finishedLogs)){
-            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
-        }
-    }
-
-    @Override
-    public void checkWatchStatus() {
-        log.info("开始更新看课中断记录>>>>>");
-        // 从 Redis 中获取所有正在看课的用户记录
-        Collection<String> keys = redisCache.keys("h5user:watch:heartbeat:*");
-        LocalDateTime now = LocalDateTime.now();
-        List<FsCourseWatchLog> logs = new ArrayList<>();
-
-        List<FsCourseWatchLog> watchingLogs = new ArrayList<>();
-        for (String key : keys) {
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            //取key中数据
-            Long qwUserId=null;
-            Long videoId=null;
-            Long externalId=null;
-            try {
-                String[] parts = key.split(":");
-                qwUserId = Long.parseLong(parts[3]);
-                externalId = Long.parseLong(parts[4]);
-                videoId = Long.parseLong(parts[5]);
-            }catch (Exception e){
-                log.error("key中id为null:{}", key);
-                continue;
-            }
-            // 获取最后心跳时间
-            String lastHeartbeatStr = redisCache.getCacheObject(key);
-            if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
-                redisCache.deleteObject(key);
-                continue; // 如果 Redis 中没有记录,跳过
-            }
-            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
-            Duration duration = Duration.between(lastHeartbeatTime, now);
-
-            watchLog.setVideoId(videoId);
-            watchLog.setQwUserId(qwUserId);
-            watchLog.setQwExternalContactId(externalId);
-            // 如果超过一分钟没有心跳,标记为“观看中断”
-            if (duration.getSeconds() >= 60) {
-                watchLog.setLogType(4);
-                // 从 Redis 中删除该记录
-                redisCache.deleteObject(key);
-            }else {
-                watchLog.setLogType(1);
-                watchingLogs.add(watchLog);
-            }
-            logs.add(watchLog);
-        }
-        batchUpdateFsCourseWatchLog(logs,100);
-
-        if(CollectionUtils.isNotEmpty(watchingLogs)){
-            fsTagUpdateService.onCourseWatchingBatch(watchingLogs);
-        }
-    }
+//    @Override
+//    public void scheduleBatchUpdateToDatabase() {
+//        log.info("开始更新看课时长,检查完课>>>>>>");
+//        //读取所有的key
+//        Collection<String> keys = redisCache.keys("h5user:watch:duration:*");
+//        //读取看课配置
+//        String json = configService.selectConfigByKey("course.config");
+//        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+//
+//        List<FsCourseWatchLog> logs = new ArrayList<>();
+//        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
+//        for (String key : keys) {
+//            //取key中数据
+//            Long qwUserId=null;
+//            Long videoId=null;
+//            Long externalId=null;
+//            try {
+//                String[] parts = key.split(":");
+//                qwUserId = Long.parseLong(parts[3]);
+//                externalId = Long.parseLong(parts[4]);
+//                videoId = Long.parseLong(parts[5]);
+//            }catch (Exception e){
+//                log.error("key中id为null:{}", key);
+//                continue;
+//            }
+//            String durationStr = redisCache.getCacheObject(key);
+//            if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
+//                redisCache.deleteObject(key);
+//                log.error("key中数据为null:{}", key);
+//                continue;  // 如果 Redis 中没有记录,跳过
+//            }
+//            Long duration = Long.valueOf(durationStr);
+//
+//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+//            watchLog.setVideoId(videoId);
+//            watchLog.setQwUserId(qwUserId);
+//            watchLog.setQwExternalContactId(externalId);
+//            watchLog.setDuration(duration);
+//
+//            //取对应视频的时长
+//            Long videoDuration = 0L;
+//            try {
+//                videoDuration = getVideoDuration(videoId);
+//            }catch (Exception e){
+//                log.error("视频时长识别错误:{}", key);
+//                continue;
+//            }
+//
+//            if (videoDuration != null && videoDuration != 0) {
+//                boolean complete = false;
+//                // 判断百分比
+//                if(config.getCompletionMode() == 1 && config.getAnswerRate() != null){
+//                    long percentage = (duration * 100 / videoDuration);
+//                    complete = percentage >= config.getAnswerRate();
+//                }
+//                // 判断分钟数
+//                if(config.getCompletionMode() == 2 && config.getMinutesNum() != null){
+//                    int i = config.getMinutesNum() * 60;
+//                    complete = videoDuration > i;
+//                }
+//                //判断是否完课
+//                if (complete) {
+//                    watchLog.setLogType(2); // 设置状态为“已完成”
+//                    watchLog.setFinishTime(new Date());
+//                    String heartbeatKey ="h5user:watch:heartbeat:" + qwUserId+ ":" + externalId + ":" + videoId;
+//                    // 完课删除心跳记录
+//                    redisCache.deleteObject(heartbeatKey);
+//                    // 完课删除看课时长记录
+//                    redisCache.deleteObject(key);
+//
+//                    finishedLogs.add(watchLog);
+//                }
+//            }
+//            //集合中增加
+//            logs.add(watchLog);
+//        }
+//
+//        batchUpdateFsCourseWatchLog(logs,100);
+//
+//        // 完课打标签
+//        if(CollectionUtils.isNotEmpty(finishedLogs)){
+//            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
+//        }
+//    }
+
+//    @Override
+//    public void checkWatchStatus() {
+//        log.info("开始更新看课中断记录>>>>>");
+//        // 从 Redis 中获取所有正在看课的用户记录
+//        Collection<String> keys = redisCache.keys("h5user:watch:heartbeat:*");
+//        LocalDateTime now = LocalDateTime.now();
+//        List<FsCourseWatchLog> logs = new ArrayList<>();
+//
+//        List<FsCourseWatchLog> watchingLogs = new ArrayList<>();
+//        for (String key : keys) {
+//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+//            //取key中数据
+//            Long qwUserId=null;
+//            Long videoId=null;
+//            Long externalId=null;
+//            try {
+//                String[] parts = key.split(":");
+//                qwUserId = Long.parseLong(parts[3]);
+//                externalId = Long.parseLong(parts[4]);
+//                videoId = Long.parseLong(parts[5]);
+//            }catch (Exception e){
+//                log.error("key中id为null:{}", key);
+//                continue;
+//            }
+//            // 获取最后心跳时间
+//            String lastHeartbeatStr = redisCache.getCacheObject(key);
+//            if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
+//                redisCache.deleteObject(key);
+//                continue; // 如果 Redis 中没有记录,跳过
+//            }
+//            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+//            Duration duration = Duration.between(lastHeartbeatTime, now);
+//
+//            watchLog.setVideoId(videoId);
+//            watchLog.setQwUserId(qwUserId);
+//            watchLog.setQwExternalContactId(externalId);
+//            // 如果超过一分钟没有心跳,标记为“观看中断”
+//            if (duration.getSeconds() >= 60) {
+//                watchLog.setLogType(4);
+//                // 从 Redis 中删除该记录
+//                redisCache.deleteObject(key);
+//            }else {
+//                watchLog.setLogType(1);
+//                watchingLogs.add(watchLog);
+//            }
+//            logs.add(watchLog);
+//        }
+//        batchUpdateFsCourseWatchLog(logs,100);
+//
+//        if(CollectionUtils.isNotEmpty(watchingLogs)){
+//            fsTagUpdateService.onCourseWatchingBatch(watchingLogs);
+//        }
+//    }
 
     @Override
     public List<FsCourseUserStatisticsListVO> selectFsCourseUserStatisticsListVO(FsCourseUserStatisticsListParam param) {