Pārlūkot izejas kodu

迁移手动发课营期

吴树波 1 mēnesi atpakaļ
vecāks
revīzija
0abbd77e9c

+ 26 - 22
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -23,6 +23,9 @@ public class CourseWatchLogScheduler {
     private final AtomicBoolean isRunning2 = new AtomicBoolean(false);
 
     private final AtomicBoolean isRunning3 = new AtomicBoolean(false);
+
+    private final AtomicBoolean isRunning4 = new AtomicBoolean(false);
+
     @Autowired
     private FsCourseWatchLogMapper courseWatchLogMapper;
 
@@ -44,9 +47,6 @@ public class CourseWatchLogScheduler {
     @Autowired
     private SopLogsTaskService sopLogsTaskService;
 
-//    @Autowired
-//    private IFsCourseRedPacketRetryService redPacketRetryService;
-
 //    // 定时任务批量更新到数据库
 //    @Scheduled(fixedRate = 60000) // 每分钟执行一次
 //    public void scheduleBatchUpdateToDatabase() {
@@ -56,9 +56,9 @@ public class CourseWatchLogScheduler {
 
 
     /**
-     * 检查看课状态(先导课只有30秒)
+     * 检查看课状态
      */
-    @Scheduled(fixedRate = 20000) // 每15秒执行一次
+    @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void checkWatchStatus() {
         // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning1.compareAndSet(false, true)) {
@@ -84,7 +84,7 @@ public class CourseWatchLogScheduler {
     /**
      * 创建完课消息
      */
-    @Scheduled(cron = "0 */5 * * * ?") // 每五分钟执行一次
+    @Scheduled(fixedRate = 300000) // 每五分钟执行一次
     public void createCourseFinishMsg() {
         // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning3.compareAndSet(false, true)) {
@@ -101,7 +101,6 @@ public class CourseWatchLogScheduler {
         } finally {
             // 重置标志为 false,表示任务已完成
             isRunning3.set(false);
-
         }
 
     }
@@ -115,7 +114,7 @@ public class CourseWatchLogScheduler {
     /**
      * 每天删除过期短链
      */
-    @Scheduled(cron = "0 5 0 * * ?")  // 0点5分0秒执行
+    @Scheduled(cron = "0 0 0 * * ?")  // 0点0分0秒执行
     public void delCourseExpireLink() {
         try {
             log.info("删除过期短链 - 定时任务开始");
@@ -127,21 +126,26 @@ public class CourseWatchLogScheduler {
 
     }
 
-    /**
-     * 红包补发定时任务
-     */
-//    @Scheduled(cron = "0 */5 * * * ?")  //每5分钟执行一次
-//    public void redPacketRetry() {
-//        try {
-//            log.info("红包补发任务开始");
-//            redPacketRetryService.redPacketRetry();
-//            log.info("红包补发任务开始");
-//        } catch (Exception e) {
-//            log.error("红包补发 - 定时任务执行失败", e);
-//        }
-
-//    }
+    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+    public void checkFsUserWatchStatus() {
+        // 尝试设置标志为 true,表示任务开始执行
+        if (!isRunning4.compareAndSet(false, true)) {
+            log.warn("WXH5-检查会员看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
+            return;
+        }
+        try {
+            log.info("WXH5-检查会员看课中任务执行>>>>>>>>>>>>");
+            courseWatchLogService.scheduleUpdateDurationToDatabase();
+            courseWatchLogService.checkFsUserWatchStatus();
+            log.info("WXH5-检查会员看课中任务执行完成>>>>>>>>>>>>");
+        }catch (Exception e) {
+            log.error("WXH5-检查会员看课中任务执行完成 - 定时任务执行失败", e);
+        } finally {
+            // 重置标志为 false,表示任务已完成
+            isRunning4.set(false);
+        }
 
+    }
 
 
 

+ 37 - 0
fs-qw-task/src/main/java/com/fs/app/task/UserCourseWatchCountTask.java

@@ -0,0 +1,37 @@
+package com.fs.app.task;
+
+import com.fs.store.service.IFsUserCourseCountService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class UserCourseWatchCountTask {
+    @Autowired
+    private IFsUserCourseCountService userCourseCountService;
+
+
+    /**
+     * 每天两点进行会员看课统计
+     */
+    @Scheduled(cron = "0 0 2 * * ?")  // 2点0分0秒执行
+    public void userCourseCountTask() {
+        try {
+            log.info("==============会员看课统计任务执行===============开始");
+            long startTime = System.currentTimeMillis();
+
+            userCourseCountService.insertFsUserCourseCountTask();
+
+            log.info("会员看课统计任务执行==============结束");
+            long endTime = System.currentTimeMillis();
+            log.info("会员看课统计任务执行----------执行时长:{}", (endTime - startTime));
+        } catch (Exception e) {
+            log.error("会员看课统计任务执行----------定时任务执行失败", e);
+        }
+
+    }
+
+
+}

+ 2 - 0
fs-service/src/main/java/com/fs/course/mapper/FsCourseWatchLogMapper.java

@@ -391,4 +391,6 @@ public interface FsCourseWatchLogMapper extends BaseMapper<FsCourseWatchLog> {
 
     @Select("select count(log_id) AS watchCount, count(case when log_type = 2 then log_id end) AS finishCount, sum(duration) AS watchTime from fs_course_watch_log where user_id = #{userId} and video_id = #{videoId}")
     Map<String, Object> selectSumByUserIdAndVideoId(@Param("userId") Long userId, @Param("videoId") Long videoId);
+
+    void batchUpdateFsUserWatchLog(@Param("list") List<FsCourseWatchLog> list);
 }

+ 3 - 0
fs-service/src/main/java/com/fs/course/service/IFsCourseWatchLogService.java

@@ -118,4 +118,7 @@ public interface IFsCourseWatchLogService extends IService<FsCourseWatchLog> {
      */
     int countByMap(Map<String, Object> params);
 
+    void scheduleUpdateDurationToDatabase();
+
+    void checkFsUserWatchStatus();
 }

+ 128 - 0
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -348,6 +348,134 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
         return 0;
     }
 
+    @Override
+    public void scheduleUpdateDurationToDatabase() {
+        log.info("WXH5-开始更新会员看课时长,检查完课>>>>>>");
+        //读取所有的key
+        Collection<String> keys = redisCache.keys("h5wxuser:watch:duration:*");
+
+        //读取看课配置
+        String json = configService.selectConfigByKey("course.config");
+        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+
+        List<FsCourseWatchLog> logs = new ArrayList<>();
+        for (String key : keys) {
+            //取key中数据
+            String[] parts = key.split(":");
+            Long userId = Long.parseLong(parts[3]);
+            Long videoId = Long.parseLong(parts[4]);
+            Long companyUserId = Long.parseLong(parts[5]);
+            String durationStr = redisCache.getCacheObject(key);
+            if(durationStr==null){
+                log.error("key中数据为null:{}",key);
+                continue;  // 如果 Redis 中没有记录,跳过
+            }
+            Long duration = Long.valueOf(durationStr);
+
+            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+            watchLog.setVideoId(videoId);
+            watchLog.setUserId(userId);
+            watchLog.setCompanyUserId(companyUserId);
+            watchLog.setDuration(duration);
+
+            //取对应视频的时长
+            Long videoDuration = getFsUserVideoDuration(videoId);
+            if (videoDuration != null && videoDuration != 0) {
+                //判断是否完课
+                long percentage = (duration * 100 / videoDuration);
+                if (percentage >= config.getAnswerRate()) {
+                    watchLog.setLogType(2); // 设置状态为“已完成”checkFsUserWatchStatus
+                    watchLog.setFinishTime(new Date());
+                    String heartbeatKey ="h5wxuser:watch:heartbeat:" + userId+ ":" + videoId + ":" + companyUserId;
+                    // 完课删除心跳记录
+                    redisCache.deleteObject(heartbeatKey);
+                    // 完课删除看课时长记录
+                    redisCache.deleteObject(key);
+                }
+            }
+            //集合中增加
+            logs.add(watchLog);
+        }
+        batchUpdateFsUserCourseWatchLog(logs,100);
+    }
+    public Long getFsUserVideoDuration(Long videoId){
+        //将视频时长也存到redis
+        String videoRedisKey = "h5wxuser:video:duration:" + videoId;
+        Long videoDuration = redisCache.getCacheObject(videoRedisKey);
+        if (videoDuration==null){
+            FsUserCourseVideo video = courseVideoMapper.selectFsUserCourseVideoByVideoId(videoId);
+            videoDuration=video.getDuration();
+            redisCache.setCacheObject(videoRedisKey,video.getDuration().toString());
+        }
+        return videoDuration;
+    }
+    @Override
+    public void checkFsUserWatchStatus() {
+        log.info("WXH5-开始更新会员看课中断记录>>>>>");
+        // 从 Redis 中获取所有正在看课的用户记录
+        Collection<String> keys = redisCache.keys("h5wxuser:watch:heartbeat:*");
+        LocalDateTime now = LocalDateTime.now();
+        List<FsCourseWatchLog> logs = new ArrayList<>();
+        for (String key : keys) {
+            FsCourseWatchLog watchLog = new FsCourseWatchLog();
+            String[] parts = key.split(":");
+            Long userId = Long.parseLong(parts[3]);
+            Long videoId = Long.parseLong(parts[4]);
+            Long companyUserId = Long.parseLong(parts[5]);
+            // 获取最后心跳时间
+            String lastHeartbeatStr = redisCache.getCacheObject(key);
+            if (lastHeartbeatStr == null) {
+                continue; // 如果 Redis 中没有记录,跳过
+            }
+            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+            Duration duration = Duration.between(lastHeartbeatTime, now);
+
+            watchLog.setVideoId(videoId);
+            watchLog.setUserId(userId);
+            watchLog.setCompanyUserId(companyUserId);
+            // 如果超过一分钟没有心跳,标记为“观看中断”
+            if (duration.getSeconds() >= 60) {
+                watchLog.setLogType(4);
+                // 从 Redis 中删除该记录
+                redisCache.deleteObject(key);
+            }else {
+                watchLog.setLogType(1);
+            }
+            logs.add(watchLog);
+        }
+        batchUpdateFsUserCourseWatchLog(logs,100);
+
+    }
+
+
+    public void batchUpdateFsUserCourseWatchLog(List<FsCourseWatchLog> logs, int batchSize) {
+        if (logs == null || logs.isEmpty()) {
+            log.info("待更新的日志列表为空,无需处理");
+            return;
+        }
+
+        // 记录总日志数量
+        log.info("开始批量更新日志,总日志数量: {}", logs.size());
+
+        // 分批处理
+        for (int i = 0; i < logs.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, logs.size());
+            List<FsCourseWatchLog> batchList = logs.subList(i, end);
+
+            // 记录当前批次的数量
+            log.info("正在更新第 {} 批日志,数量: {}", (i / batchSize) + 1, batchList.size());
+
+            // 执行批量更新
+            fsCourseWatchLogMapper.batchUpdateFsUserWatchLog(batchList);
+
+            // 记录当前批次更新完成
+            log.info("第 {} 批日志更新完成,数量: {}", (i / batchSize) + 1, batchList.size());
+        }
+
+        // 记录全部更新完成
+        log.info("所有日志更新完成,总日志数量: {}", logs.size());
+    }
+
     @Override
     public FsCourseWatchLog getWatchCourseLogVideoBySop(Long videoId,String qwUserId,Long externalId )
     {

+ 48 - 0
fs-service/src/main/resources/mapper/course/FsCourseWatchLogMapper.xml

@@ -552,4 +552,52 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             #{item.logId}
         </foreach>
     </update>
+
+    <update id="batchUpdateFsUserWatchLog" parameterType="java.util.List">
+        UPDATE fs_course_watch_log
+        SET
+        duration = CASE
+        <foreach collection="list" item="item" index="index">
+            WHEN video_id = #{item.videoId} AND user_id = #{item.userId} AND company_user_id = #{item.companyUserId} THEN
+            CASE
+            <!-- 仅当传入的duration > 当前值时才更新 -->
+            WHEN #{item.duration} IS NOT NULL AND #{item.duration} > duration THEN #{item.duration}
+            ELSE duration <!-- 如果 duration 为 null,保持原值 -->
+            END
+        </foreach>
+        END,
+        last_heartbeat_time = CASE
+        <foreach collection="list" item="item" index="index">
+            WHEN video_id = #{item.videoId} AND user_id = #{item.userId} AND company_user_id = #{item.companyUserId} THEN
+            CASE
+            WHEN #{item.lastHeartbeatTime} IS NOT NULL THEN #{item.lastHeartbeatTime}
+            ELSE last_heartbeat_time <!-- 如果 last_heartbeat_time 为 null,保持原值 -->
+            END
+        </foreach>
+        END,
+        finish_time = CASE
+        <foreach collection="list" item="item" index="index">
+            WHEN video_id = #{item.videoId} AND user_id = #{item.userId} AND company_user_id = #{item.companyUserId} THEN
+            CASE
+            WHEN finish_time IS NULL THEN #{item.finishTime} <!-- 如果表中 finish_time 为 null,更新为传入的值 -->
+            ELSE finish_time <!-- 如果表中 finish_time 不为 null,保持原值 -->
+            END
+        </foreach>
+        END,
+        log_type = CASE
+        <foreach collection="list" item="item" index="index">
+            WHEN video_id = #{item.videoId} AND user_id = #{item.userId} AND company_user_id = #{item.companyUserId} THEN
+            CASE
+            WHEN log_type = 2 THEN log_type <!-- 如果 log_type 已经是 2,保持原值 -->
+            WHEN #{item.logType} IS NOT NULL AND log_type != 2 THEN #{item.logType} <!-- 如果 log_type 不是 2,更新为传入的值 -->
+            ELSE log_type <!-- 其他情况保持原值 -->
+            END
+        </foreach>
+        END
+        WHERE
+        (video_id, user_id, company_user_id) IN
+        <foreach collection="list" item="item" index="index" open="(" separator="," close=")">
+            (#{item.videoId}, #{item.userId}, #{item.companyUserId})
+        </foreach>
+    </update>
 </mapper>