zyp пре 1 недеља
родитељ
комит
14d49faec8

+ 50 - 8
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -9,10 +9,12 @@ import com.fs.course.service.IFsCourseWatchLogService;
 import com.fs.sop.mapper.QwSopLogsMapper;
 import com.fs.system.service.ISysConfigService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.util.Calendar;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @Component
@@ -55,29 +57,69 @@ public class CourseWatchLogScheduler {
 
 
 
-    /**
-     * 检查看课状态
-     */
+//    /**
+//     * 检查看课状态
+//     */
+//    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+//    public void checkWatchStatus() {
+//        // 尝试设置标志为 true,表示任务开始执行
+//        if (!isRunning1.compareAndSet(false, true)) {
+//            log.warn("检查看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
+//            return;
+//        }
+//        try {
+//            log.info("检查看课中任务执行>>>>>>>>>>>>");
+//            courseWatchLogService.scheduleBatchUpdateToDatabase();
+////            courseWatchLogService.scheduleBatchUpdateToDatabaseIsOpen();
+//            courseWatchLogService.checkWatchStatus();
+//            log.info("检查看课中任务执行完成>>>>>>>>>>>>");
+//        }catch (Exception e) {
+//            log.error("检查看课中任务执行完成 - 定时任务执行失败", e);
+//        } finally {
+//            // 重置标志为 false,表示任务已完成
+//            isRunning1.set(false);
+//        }
+//
+//    }
+
+
     @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void checkWatchStatus() {
         // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning1.compareAndSet(false, true)) {
-            log.warn("检查看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
+            log.info("检查看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
             return;
         }
+
         try {
             log.info("检查看课中任务执行>>>>>>>>>>>>");
             courseWatchLogService.scheduleBatchUpdateToDatabase();
-            courseWatchLogService.scheduleBatchUpdateToDatabaseIsOpen();
             courseWatchLogService.checkWatchStatus();
             log.info("检查看课中任务执行完成>>>>>>>>>>>>");
-        }catch (Exception e) {
-            log.error("检查看课中任务执行完成 - 定时任务执行失败", e);
+
+            // 检查当前时间是否为整五分钟(0, 5, 10, 15, ... 55分钟)
+            Calendar calendar = Calendar.getInstance();
+            int minute = calendar.get(Calendar.MINUTE);
+
+            // 只有当分钟数是5的倍数时才执行创建完课消息
+            if (minute % 5 == 0) {
+                try {
+                    long startTime = System.currentTimeMillis();
+                    log.info("创建完课消息 - 定时任务开始"+System.currentTimeMillis());
+                    sopLogsTaskService.createCourseFinishMsg();
+                    long endTime = System.currentTimeMillis();
+                    long duration = endTime - startTime;
+                    log.info("创建完课消息 - 定时任务成功完成"+duration);
+                } catch (Exception e) {
+                    log.error("创建完课消息 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
+                }
+            }
+        } catch (Exception e) {
+            log.error("检查看课中任务执行完成 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
         } finally {
             // 重置标志为 false,表示任务已完成
             isRunning1.set(false);
         }
-
     }
 
 

+ 0 - 1
fs-service/src/main/java/com/fs/course/config/RedisKeyScanner.java

@@ -105,7 +105,6 @@ public class RedisKeyScanner {
                     .match(pattern)
                     .count(1000) // 每次扫描的批量大小,可根据实际情况调整
                     .build())) {
-
                 while (cursor.hasNext()) {
                     keys.add(new String(cursor.next(), StandardCharsets.UTF_8)); // 显式指定字符集,避免依赖默认值
                 }

+ 323 - 113
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -20,6 +20,7 @@ import com.fs.company.domain.Company;
 import com.fs.company.domain.CompanyUser;
 import com.fs.company.mapper.CompanyMapper;
 import com.fs.course.config.CourseConfig;
+import com.fs.course.config.RedisKeyScanner;
 import com.fs.course.domain.*;
 import com.fs.course.mapper.*;
 import com.fs.course.param.*;
@@ -72,6 +73,9 @@ import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -808,143 +812,349 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
     @Autowired
     private ISysConfigService configService;
 
+    @Autowired
+    private RedisKeyScanner redisKeyScanner;
+
     @Override
     public void scheduleBatchUpdateToDatabase() {
-        log.info("开始更新看课时长,检查完课>>>>>>");
-        //读取所有的key
-        Collection<String> keys = redisCache.keys("h5user:watch:duration:*");
-        //读取看课配置
-        String json = configService.selectConfigByKey("course.config");
-        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+        try {
+            log.info("开始更新看课时长,检查完课>>>>>>");
 
-        List<FsCourseWatchLog> logs = new ArrayList<>();
-        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
-        for (String key : keys) {
-            //取key中数据
-            Long qwUserId = null;
-            Long videoId = null;
-            Long externalId = null;
-            try {
-                String[] parts = key.split(":");
-                qwUserId = Long.parseLong(parts[3]);
-                externalId = Long.parseLong(parts[4]);
-                videoId = Long.parseLong(parts[5]);
-            } catch (Exception e) {
-                log.error("key中id为null:{}", key);
-                continue;
-            }
-            String durationStr = redisCache.getCacheObject(key);
-            if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
-                redisCache.deleteObject(key);
-                log.error("key中数据为null:{}", key);
-                continue;  // 如果 Redis 中没有记录,跳过
+            // 读取所有的key
+            Set<String> keys = redisKeyScanner.scanMatchKey("h5user:watch:duration:*");
+            log.info("共扫描到 {} 个待处理键", keys.size());
+
+            // 如果keys为空,直接返回
+            if (CollectionUtils.isEmpty(keys)) {
+                return;
             }
-            Long duration = Long.valueOf(durationStr);
 
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            watchLog.setVideoId(videoId);
-            watchLog.setQwUserId(qwUserId);
-            watchLog.setQwExternalContactId(externalId);
-            watchLog.setDuration(duration);
+            // 读取看课配置
+            String json = configService.selectConfigByKey("course.config");
+            CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
 
-            //取对应视频的时长
-            Long videoDuration = 0L;
-            try {
-                videoDuration = getVideoDuration(videoId);
-            } catch (Exception e) {
-                log.error("视频时长识别错误:{}", key);
-                continue;
-            }
+            // 创建线程安全的集合
+            List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
 
-            if (videoDuration != null && videoDuration != 0) {
-                boolean complete = false;
-                // 判断百分比
-                if (config.getCompletionMode() == 1 && config.getAnswerRate() != null) {
-                    long percentage = (duration * 100 / videoDuration);
-                    complete = percentage >= config.getAnswerRate();
-                }
-                // 判断分钟数
-                if (config.getCompletionMode() == 2 && config.getMinutesNum() != null) {
-                    int i = config.getMinutesNum() * 60;
-                    complete = videoDuration > i;
-                }
-                //判断是否完课
-                if (complete) {
-                    watchLog.setLogType(2); // 设置状态为“已完成”
-                    watchLog.setFinishTime(new Date());
-                    String heartbeatKey = "h5user:watch:heartbeat:" + qwUserId + ":" + externalId + ":" + videoId;
-                    // 完课删除心跳记录
-                    redisCache.deleteObject(heartbeatKey);
-                    // 完课删除看课时长记录
-                    redisCache.deleteObject(key);
+            // 创建线程池(根据服务器配置调整线程数)
+            int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
+            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
 
-                    finishedLogs.add(watchLog);
-                }
+            // 将keys分成多个批次,每个线程处理一批
+            List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
+
+            log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
+
+            int type = 1; //检查完课任务
+
+            // 创建所有任务
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (List<String> batchKeys : keyBatches) {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                    processKeyBatch(batchKeys, config, logs,type);
+                }, executorService);
+                futures.add(future);
             }
-            //集合中增加
-            logs.add(watchLog);
-        }
 
-        batchUpdateFsCourseWatchLog(logs, 100);
+            // 等待所有任务完成
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+            // 关闭线程池
+            executorService.shutdown();
+
+            // 批量更新到数据库
+            batchUpdateFsCourseWatchLog(logs, 100);
 
-        // 完课打标签
-        if (CollectionUtils.isNotEmpty(finishedLogs)) {
-            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
+        } catch (Exception e) {
+            log.error("定时任务执行失败 scheduleBatchUpdateToDatabase", e);
         }
+//        log.info("开始更新看课时长,检查完课>>>>>>");
+//        //读取所有的key
+//        Collection<String> keys = redisCache.keys("h5user:watch:duration:*");
+//        //读取看课配置
+//        String json = configService.selectConfigByKey("course.config");
+//        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+//
+//        List<FsCourseWatchLog> logs = new ArrayList<>();
+//        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
+//        for (String key : keys) {
+//            //取key中数据
+//            Long qwUserId = null;
+//            Long videoId = null;
+//            Long externalId = null;
+//            try {
+//                String[] parts = key.split(":");
+//                qwUserId = Long.parseLong(parts[3]);
+//                externalId = Long.parseLong(parts[4]);
+//                videoId = Long.parseLong(parts[5]);
+//            } catch (Exception e) {
+//                log.error("key中id为null:{}", key);
+//                continue;
+//            }
+//            String durationStr = redisCache.getCacheObject(key);
+//            if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
+//                redisCache.deleteObject(key);
+//                log.error("key中数据为null:{}", key);
+//                continue;  // 如果 Redis 中没有记录,跳过
+//            }
+//            Long duration = Long.valueOf(durationStr);
+//
+//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+//            watchLog.setVideoId(videoId);
+//            watchLog.setQwUserId(qwUserId);
+//            watchLog.setQwExternalContactId(externalId);
+//            watchLog.setDuration(duration);
+//
+//            //取对应视频的时长
+//            Long videoDuration = 0L;
+//            try {
+//                videoDuration = getVideoDuration(videoId);
+//            } catch (Exception e) {
+//                log.error("视频时长识别错误:{}", key);
+//                continue;
+//            }
+//
+//            if (videoDuration != null && videoDuration != 0) {
+//                boolean complete = false;
+//                // 判断百分比
+//                if (config.getCompletionMode() == 1 && config.getAnswerRate() != null) {
+//                    long percentage = (duration * 100 / videoDuration);
+//                    complete = percentage >= config.getAnswerRate();
+//                }
+//                // 判断分钟数
+//                if (config.getCompletionMode() == 2 && config.getMinutesNum() != null) {
+//                    int i = config.getMinutesNum() * 60;
+//                    complete = videoDuration > i;
+//                }
+//                //判断是否完课
+//                if (complete) {
+//                    watchLog.setLogType(2); // 设置状态为“已完成”
+//                    watchLog.setFinishTime(new Date());
+//                    String heartbeatKey = "h5user:watch:heartbeat:" + qwUserId + ":" + externalId + ":" + videoId;
+//                    // 完课删除心跳记录
+//                    redisCache.deleteObject(heartbeatKey);
+//                    // 完课删除看课时长记录
+//                    redisCache.deleteObject(key);
+//
+//                    finishedLogs.add(watchLog);
+//                }
+//            }
+//            //集合中增加
+//            logs.add(watchLog);
+//        }
+//
+//        batchUpdateFsCourseWatchLog(logs, 100);
+//
+//        // 完课打标签
+//        if (CollectionUtils.isNotEmpty(finishedLogs)) {
+//            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
+//        }
     }
 
     @Override
     public void checkWatchStatus() {
-        log.info("开始更新看课中断记录>>>>>");
-        // 从 Redis 中获取所有正在看课的用户记录
-        Collection<String> keys = redisCache.keys("h5user:watch:heartbeat:*");
-        LocalDateTime now = LocalDateTime.now();
-        List<FsCourseWatchLog> logs = new ArrayList<>();
+        try {
+            log.info("开始更新看课中断记录>>>>>");
 
-        List<FsCourseWatchLog> watchingLogs = new ArrayList<>();
-        for (String key : keys) {
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            //取key中数据
-            Long qwUserId = null;
-            Long videoId = null;
-            Long externalId = null;
-            try {
-                String[] parts = key.split(":");
-                qwUserId = Long.parseLong(parts[3]);
-                externalId = Long.parseLong(parts[4]);
-                videoId = Long.parseLong(parts[5]);
-            } catch (Exception e) {
-                log.error("key中id为null:{}", key);
-                continue;
+            // 读取所有的key
+            Set<String> keys = redisKeyScanner.scanMatchKey("h5user:watch:heartbeat:*");
+            log.info("共扫描到 {} 个待处理键", keys.size());
+
+            // 如果keys为空,直接返回
+            if (CollectionUtils.isEmpty(keys)) {
+                return;
             }
-            // 获取最后心跳时间
-            String lastHeartbeatStr = redisCache.getCacheObject(key);
-            if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
-                redisCache.deleteObject(key);
-                continue; // 如果 Redis 中没有记录,跳过
+
+            // 读取看课配置
+            String json = configService.selectConfigByKey("course.config");
+            CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+
+            // 创建线程安全的集合
+            List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
+
+            // 创建线程池(根据服务器配置调整线程数)
+            int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
+            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
+
+            // 将keys分成多个批次,每个线程处理一批
+            List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
+
+            log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
+
+            int type = 2; //检查看课中断
+
+            // 创建所有任务
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (List<String> batchKeys : keyBatches) {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                    processKeyBatch(batchKeys, config, logs, type);
+                }, executorService);
+                futures.add(future);
             }
-            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
-            Duration duration = Duration.between(lastHeartbeatTime, now);
 
-            watchLog.setVideoId(videoId);
-            watchLog.setQwUserId(qwUserId);
-            watchLog.setQwExternalContactId(externalId);
-            // 如果超过一分钟没有心跳,标记为“观看中断”
-            if (duration.getSeconds() >= 60) {
-                watchLog.setLogType(4);
-                // 从 Redis 中删除该记录
-                redisCache.deleteObject(key);
-            } else {
-                watchLog.setLogType(1);
-                watchingLogs.add(watchLog);
+            // 等待所有任务完成
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+            // 关闭线程池
+            executorService.shutdown();
+
+            // 批量更新到数据库
+            batchUpdateFsCourseWatchLog(logs, 100);
+
+        } catch (Exception e) {
+            log.error("定时任务执行失败checkWatchStatus", e);
+        }
+//        log.info("开始更新看课中断记录>>>>>");
+//        // 从 Redis 中获取所有正在看课的用户记录
+//        Collection<String> keys = redisCache.keys("h5user:watch:heartbeat:*");
+//        LocalDateTime now = LocalDateTime.now();
+//        List<FsCourseWatchLog> logs = new ArrayList<>();
+//
+//        List<FsCourseWatchLog> watchingLogs = new ArrayList<>();
+//        for (String key : keys) {
+//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+//            //取key中数据
+//            Long qwUserId = null;
+//            Long videoId = null;
+//            Long externalId = null;
+//            try {
+//                String[] parts = key.split(":");
+//                qwUserId = Long.parseLong(parts[3]);
+//                externalId = Long.parseLong(parts[4]);
+//                videoId = Long.parseLong(parts[5]);
+//            } catch (Exception e) {
+//                log.error("key中id为null:{}", key);
+//                continue;
+//            }
+//            // 获取最后心跳时间
+//            String lastHeartbeatStr = redisCache.getCacheObject(key);
+//            if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
+//                redisCache.deleteObject(key);
+//                continue; // 如果 Redis 中没有记录,跳过
+//            }
+//            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+//            Duration duration = Duration.between(lastHeartbeatTime, now);
+//
+//            watchLog.setVideoId(videoId);
+//            watchLog.setQwUserId(qwUserId);
+//            watchLog.setQwExternalContactId(externalId);
+//            // 如果超过一分钟没有心跳,标记为“观看中断”
+//            if (duration.getSeconds() >= 60) {
+//                watchLog.setLogType(4);
+//                // 从 Redis 中删除该记录
+//                redisCache.deleteObject(key);
+//            } else {
+//                watchLog.setLogType(1);
+//                watchingLogs.add(watchLog);
+//            }
+//            logs.add(watchLog);
+//        }
+//        batchUpdateFsCourseWatchLog(logs, 100);
+//
+//        if (CollectionUtils.isNotEmpty(watchingLogs)) {
+//            fsTagUpdateService.onCourseWatchingBatch(watchingLogs);
+//        }
+    }
+
+    /**
+     * 处理一个key批次
+     */
+    private void processKeyBatch(List<String> batchKeys, CourseConfig config, List<FsCourseWatchLog> logs,int type) {
+        LocalDateTime now = LocalDateTime.now();
+        for (String key : batchKeys) {
+            try {
+                // 取key中数据
+                Long qwUserId = null;
+                Long videoId = null;
+                Long externalId = null;
+                try {
+                    String[] parts = key.split(":");
+                    qwUserId = Long.parseLong(parts[3]);
+                    externalId = Long.parseLong(parts[4]);
+                    videoId = Long.parseLong(parts[5]);
+                } catch (Exception e) {
+                    log.error("key中id为null:{}", key);
+                    continue;
+                }
+                FsCourseWatchLog watchLog = new FsCourseWatchLog();
+                //检查完课
+                if (type==1){
+                    String durationStr = redisCache.getCacheObject(key);
+                    if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
+                        redisCache.deleteObject(key);
+                        log.error("key中数据为null:{}", key);
+                        continue;  // 如果 Redis 中没有记录,跳过
+                    }
+                    Long duration = Long.valueOf(durationStr);
+
+                    watchLog.setDuration(duration);
+
+                    // 取对应视频的时长
+                    Long videoDuration;
+                    try {
+                        videoDuration = getVideoDuration(videoId);
+                    } catch (Exception e) {
+                        log.error("视频时长识别错误:{}", key);
+                        continue;
+                    }
+
+
+                    if (videoDuration != null && videoDuration != 0) {
+                        // 判断是否完课
+                        long percentage = (duration * 100 / videoDuration);
+                        if (percentage >= config.getAnswerRate()) {
+                            watchLog.setLogType(2); // 设置状态为"已完成"
+                            watchLog.setFinishTime(new Date());
+                            String heartbeatKey = "h5user:watch:heartbeat:" + qwUserId + ":" + externalId + ":" + videoId;
+                            // 完课删除心跳记录
+                            redisCache.deleteObject(heartbeatKey);
+                            // 完课删除看课时长记录
+                            redisCache.deleteObject(key);
+                        }
+                    }
+                }else{
+                    //检查看课中断
+                    // 获取最后心跳时间
+                    String lastHeartbeatStr = redisCache.getCacheObject(key);
+                    if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
+                        redisCache.deleteObject(key);
+                        continue; // 如果 Redis 中没有记录,跳过
+                    }
+                    LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+                    Duration duration = Duration.between(lastHeartbeatTime, now);
+                    // 如果超过一分钟没有心跳,标记为“观看中断”
+                    if (duration.getSeconds() >= 60) {
+                        watchLog.setLogType(4);
+                        // 从 Redis 中删除该记录
+                        redisCache.deleteObject(key);
+                    } else {
+                        watchLog.setLogType(1);
+                    }
+                }
+                watchLog.setVideoId(videoId);
+                watchLog.setQwUserId(qwUserId);
+                watchLog.setQwExternalContactId(externalId);
+
+
+                // 线程安全地添加到集合
+                logs.add(watchLog);
+
+            } catch (Exception e) {
+                log.error("处理key {} 时发生异常: {}", key, e.getMessage());
             }
-            logs.add(watchLog);
         }
-        batchUpdateFsCourseWatchLog(logs, 100);
+    }
 
-        if (CollectionUtils.isNotEmpty(watchingLogs)) {
-            fsTagUpdateService.onCourseWatchingBatch(watchingLogs);
+    /**
+     * 将keys分成多个批次
+     */
+    private List<List<String>> partitionKeys(List<String> keys, int batchSize) {
+        List<List<String>> batches = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, keys.size());
+            batches.add(keys.subList(i, end));
         }
+        return batches;
     }
 
     @Override