Kaynağa Gözat

qw_task用户看课统计代码迁移

yjwang 1 gün önce
ebeveyn
işleme
00e8bcd79d

+ 13 - 1
fs-qw-task/src/main/java/com/fs/app/task/UserCourseWatchCountTask.java

@@ -6,18 +6,27 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 @Component
 @Slf4j
 public class UserCourseWatchCountTask {
     @Autowired
     private IFsUserCourseCountService userCourseCountService;
 
+    private final AtomicBoolean isRunning1 = new AtomicBoolean(false);
+
 
     /**
      * 每15分钟执行一次
      */
-//    @Scheduled(cron = "0 */10 * * * ?")  // 每10分钟执行一次
+    @Scheduled(cron = "0 */20 * * * ?")  // 每10分钟执行一次
     public void userCourseCountTask() {
+        // 尝试设置标志为 true,表示任务开始执行
+        if (!isRunning1.compareAndSet(false, true)) {
+            log.warn("会员看课统计任务执行 - 上一个任务尚未完成,跳过此次执行");
+            return;
+        }
         try {
             log.info("==============会员看课统计任务执行===============开始");
             long startTime = System.currentTimeMillis();
@@ -29,6 +38,9 @@ public class UserCourseWatchCountTask {
             log.info("会员看课统计任务执行----------执行时长:{}", (endTime - startTime));
         } catch (Exception e) {
             log.error("会员看课统计任务执行----------定时任务执行失败", e);
+        } finally {
+            // 重置标志为 false,表示任务已完成
+            isRunning1.set(false);
         }
 
     }

+ 16 - 1
fs-service/src/main/java/com/fs/store/mapper/FsUserCourseCountMapper.java

@@ -68,8 +68,17 @@ public interface FsUserCourseCountMapper
     /**
      * 获取看课统计表结果
      * @return list
+     * @param userIds 用户id列表
      */
-    List<FsUserCourseCount> getCountResult();
+    List<FsUserCourseCount> getCountResult(@Param("userIds") List<Long> userIds);
+
+    /**
+     *
+     * @param offset 从第几条开始查询
+     * @param pageSize 每页数量
+     * @return
+     */
+    List<Long> getUsersByPage(@Param("offset") int offset, @Param("pageSize") int pageSize);
 
     /**
      * 获取最近七天每天最大心跳时间的看课记录数据
@@ -77,6 +86,12 @@ public interface FsUserCourseCountMapper
      */
     List<FsUserCourseCount> getUserStatusAndLastWatchDate();
 
+    /**
+     * 插入/更新看课统计表
+     * @param list
+     */
+    void batchInsertOrUpdate(@Param("list") List<FsUserCourseCount> list);
+
     /**
      * 往看课统计表中插入数据
      */

+ 101 - 16
fs-service/src/main/java/com/fs/store/service/impl/FsUserCourseCountServiceImpl.java

@@ -6,12 +6,15 @@ import com.fs.store.domain.FsUserCourseCount;
 import com.fs.store.mapper.FsUserCourseCountMapper;
 import com.fs.store.service.IFsUserCourseCountService;
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.ibatis.session.ExecutorType;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -23,6 +26,7 @@ import java.util.stream.Collectors;
  * @date 2025-04-02
  */
 @Service
+@Slf4j
 public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
 {
     @Autowired
@@ -115,27 +119,58 @@ public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
 
     @Override
     public void insertFsUserCourseCountTask() {
-        // 1、获取统计结果
-        List<FsUserCourseCount> countResult = fsUserCourseCountMapper.getCountResult();
+        // 总处理量-执行中
+        int totalProcessed = 0;
+        long startTime = System.currentTimeMillis();
+
+        log.info("开始处理~~~~~~~~~~~~~~~~~");
+        // 1、分页分批次查询并处理数据
+        int page = 1;
+        int pageSize = 1000;
+        while (true) {
+            List<Long> userIds = fsUserCourseCountMapper.getUsersByPage((page - 1) * pageSize, pageSize);
+            if (userIds.isEmpty()) {
+                break;
+            }
+            log.info("处理第 {} 页,用户数: {}", page, userIds.size());
 
-        // 查询用户-每天的最新的看课状态,和最后的心跳时间
-        List<FsUserCourseCount> userStatusAndLastWatchDate = fsUserCourseCountMapper.getUserStatusAndLastWatchDate();
-        Map<String, FsUserCourseCount> map = userStatusAndLastWatchDate.stream()
-                .collect(Collectors.toMap(k -> String.format("%s-%s-%s", k.getUserId(),k.getProjectId(), k.getLastDate()), v -> v));
-
-        for (FsUserCourseCount data : countResult) {
-            String key = String.format("%s-%s-%s", data.getUserId(),data.getProjectId(), data.getLastDate());
-            FsUserCourseCount fsUserCourseCount = map.get(key);
-            if(fsUserCourseCount != null){
-                data.setLastWatchDate(fsUserCourseCount.getLastWatchDate());
-                data.setStatus(fsUserCourseCount.getStatus());
-//                data.setStopWatchDays(fsUserCourseCount.getStopWatchDays());
+            // 2、查询当前页用户的统计结果
+            List<FsUserCourseCount> countResult = Collections.emptyList();
+            if (!userIds.isEmpty()) {
+                countResult = fsUserCourseCountMapper.getCountResult(userIds);
+
+                // 3、分批插入数据
+                this.batchInsertOrUpdateNew(countResult);
+            }
+
+            totalProcessed += countResult.size();
+            // 每处理10页记录一次进度
+            if (page % 10 == 0) {
+                log.info("处理进度: 第{}页,已处理 {} 条记录", page, totalProcessed);
             }
+            page++;
         }
 
-        // 2、分批插入数据
-        this.batchInsert(countResult);
+        long endTime = System.currentTimeMillis();
+        log.info("处理完成!总共处理 {} 条记录,总耗时: {} 毫秒", totalProcessed, endTime - startTime);
+
+        // 获取统计结果
+//        List<FsUserCourseCount> countResult = fsUserCourseCountMapper.getCountResult();
 
+        // 查询用户-每天的最新的看课状态,和最后的心跳时间
+//        List<FsUserCourseCount> userStatusAndLastWatchDate = fsUserCourseCountMapper.getUserStatusAndLastWatchDate();
+//        Map<String, FsUserCourseCount> map = userStatusAndLastWatchDate.stream()
+//                .collect(Collectors.toMap(k -> String.format("%s-%s-%s", k.getUserId(),k.getProjectId(), k.getLastDate()), v -> v));
+
+//        for (FsUserCourseCount data : countResult) {
+//            String key = String.format("%s-%s-%s", data.getUserId(),data.getProjectId(), data.getLastDate());
+//            FsUserCourseCount fsUserCourseCount = map.get(key);
+//            if(fsUserCourseCount != null){
+//                data.setLastWatchDate(fsUserCourseCount.getLastWatchDate());
+//                data.setStatus(fsUserCourseCount.getStatus());
+//                data.setStopWatchDays(fsUserCourseCount.getStopWatchDays());
+//            }
+//        }
     }
 
     private void batchInsert(List<FsUserCourseCount> list) {
@@ -152,4 +187,54 @@ public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
             }
         });
     }
+
+    /**
+     * 分批次插入数据
+     * @author Caolq
+     * @param list 数据列表
+     */
+    private void batchInsertOrUpdateNew(List<FsUserCourseCount> list){
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        // 分批次处理,一次提交500条
+        int insertBatchSize = 500;
+        try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
+            FsUserCourseCountMapper mapper = sqlSession.getMapper(FsUserCourseCountMapper.class);
+
+            long totalStartTime = System.currentTimeMillis();
+            int totalInserted = 0;
+
+            // 将数据分割
+            List<List<FsUserCourseCount>> batches = Lists.partition(list, insertBatchSize);
+
+            for (int i = 0; i < batches.size(); i++) {
+                List<FsUserCourseCount> batch = batches.get(i);
+                try {
+                    // 批量插入
+                    mapper.batchInsertOrUpdate(batch);
+
+                    // 定期提交事务,避免事务过大
+                    if ((i + 1) % 10 == 0) {
+                        sqlSession.commit();
+                        log.debug("已提交第 {} 到 {} 批次", i - 9, i + 1);
+                    }
+                    totalInserted += batch.size();
+                } catch (Exception e) {
+                    log.error("批次 {} 插入/更新失败,大小:{}", i + 1, batch.size(), e);
+                    sqlSession.rollback();
+                }
+            }
+
+            // 提交剩余未提交的数据,避免提交漏
+            sqlSession.commit();
+
+            long totalEndTime = System.currentTimeMillis();
+            log.info("当前页批量插入/更新完成,总记录数:{},总耗时:{} 毫秒", totalInserted, totalEndTime - totalStartTime);
+        } catch (Exception e) {
+            log.error("批量插入/更新过程中发生错误", e);
+            throw new RuntimeException("批量插入/更新失败", e);
+        }
+    }
 }

+ 78 - 17
fs-service/src/main/resources/mapper/store/FsUserCourseCountMapper.xml

@@ -148,28 +148,52 @@
         </foreach>
     </delete>
 
+    <select id="getUsersByPage" resultType="Long">
+        SELECT DISTINCT
+            fs_user.user_id
+        FROM
+            fs_user
+                left join fs_user_company_user ucu on ucu.user_id = fs_user.user_id
+        where ucu.company_user_id is not null
+            LIMIT #{offset}, #{pageSize}
+    </select>
+
     <select id="getCountResult" resultType="FsUserCourseCount">
         SELECT
-            fwl.user_id,
-            ucu.project_id,
-            count( DISTINCT CASE WHEN fwl.log_type != 3 THEN fwl.video_id END ) AS watchCourseCount,
-            count( DISTINCT CASE WHEN fwl.log_type = 3 THEN fwl.video_id END ) AS missCourseCount,
-            IF
-            ( count( DISTINCT CASE WHEN fwl.log_type = 3 THEN fwl.video_id END ) > 0, 1, 2 ) AS missCourseStatus,
-            GROUP_CONCAT( DISTINCT fwl.period_id ) AS courseIds,
-            count(DISTINCT fwl.period_id ) AS partCourseCount,
-            max( CASE WHEN fwl.log_type = 2 THEN fwl.last_heartbeat_time END ) AS completeWatchDate,
-            count( CASE WHEN fwl.log_type = 2 THEN fwl.log_id END ) AS completeWatchCount,
-            count( CASE WHEN fwl.log_type != 3 THEN fwl.log_id END ) AS watch_times,
-            DATE_FORMAT(fwl.create_time,'%Y-%m-%d 00:00:00') AS createTime,
-            NOW() AS updateTime,
-            DATE_FORMAT(fwl.create_time,'%Y-%m-%d') AS create_date,
-            DATE (fwl.create_time ) AS lastDate
+        fwl.user_id,
+        ucu.project_id,
+        count( DISTINCT CASE WHEN fwl.log_type != 3 THEN fwl.video_id END ) AS watchCourseCount,
+        count( DISTINCT CASE WHEN fwl.log_type = 3 THEN fwl.video_id END ) AS missCourseCount,
+        IF
+        ( count( DISTINCT CASE WHEN fwl.log_type = 3 THEN fwl.video_id END ) > 0, 1, 2 ) AS missCourseStatus,
+        GROUP_CONCAT( DISTINCT fwl.period_id ) AS courseIds,
+        count(DISTINCT fwl.period_id ) AS partCourseCount,
+        max( CASE WHEN fwl.log_type = 2 THEN fwl.last_heartbeat_time END ) AS completeWatchDate,
+        count( CASE WHEN fwl.log_type = 2 THEN fwl.log_id END ) AS completeWatchCount,
+        count( CASE WHEN fwl.log_type != 3 THEN fwl.log_id END ) AS watch_times,
+        DATE_FORMAT(fwl.create_time,'%Y-%m-%d 00:00:00') AS createTime,
+        NOW() AS updateTime,
+        DATE_FORMAT(fwl.create_time,'%Y-%m-%d') AS create_date,
+        DATE (fwl.create_time ) AS lastDate
+        ,Max( fwl.last_heartbeat_time ) AS lastWatchDate,
+        CASE
+        WHEN fwl.log_type = 1
+        OR fwl.log_type = 2 THEN
+        1
+        WHEN fwl.log_type = 4 THEN
+        2
+        WHEN fwl.log_type = 3 THEN
+        3
+        END AS STATUS
         FROM fs_course_watch_log fwl
         left join fs_user_company_user ucu on ucu.user_id = fwl.user_id
-        where fwl.send_type = 1 and fwl.create_time &gt;= DATE_SUB(CURDATE(), INTERVAL 15 DAY) and fwl.project = ucu.project_id
+        where fwl.user_id in
+        <foreach item="userId" collection="userIds" open="(" separator="," close=")">
+            #{userId}
+        </foreach>
+        and fwl.send_type = 1 and fwl.create_time >= DATE_SUB(CURDATE(), INTERVAL 15 DAY) and fwl.project = ucu.project_id
         GROUP BY
-            fwl.user_id, date(fwl.create_time),ucu.project_id
+        fwl.user_id, date(fwl.create_time),ucu.project_id
     </select>
 
     <select id="getUserStatusAndLastWatchDate" resultType="FsUserCourseCount">
@@ -256,6 +280,43 @@
         </trim>
     </insert>
 
+    <insert id="batchInsertOrUpdate">
+        INSERT INTO fs_user_course_count
+        ( user_id,
+        watch_course_count,
+        miss_course_count,
+        miss_course_status,
+        course_ids,
+        part_course_count,
+        last_watch_date,
+        status,
+        create_time,
+        update_time,
+        complete_watch_date,
+        complete_watch_count,
+        watch_times,
+        create_date,
+        project_id )
+        VALUES
+        <foreach collection="list" item="item" separator=",">
+            (#{item.userId}, #{item.watchCourseCount}, #{item.missCourseCount}, #{item.missCourseStatus}, #{item.courseIds},
+            #{item.partCourseCount}, #{item.lastWatchDate}, #{item.status},#{item.createTime},#{item.updateTime},#{item.completeWatchDate}
+            ,#{item.completeWatchCount},#{item.watchTimes},#{item.createDate},#{item.projectId})
+        </foreach>
+        on duplicate key update
+        watch_course_count = VALUES(watch_course_count),
+        miss_course_count = VALUES(miss_course_count),
+        miss_course_status = VALUES(miss_course_status),
+        course_ids = VALUES(course_ids),
+        part_course_count = VALUES(part_course_count),
+        last_watch_date = VALUES(last_watch_date),
+        status = VALUES(status),
+        complete_watch_date = VALUES(complete_watch_date),
+        complete_watch_count = VALUES(complete_watch_count),
+        watch_times = VALUES(watch_times),
+        update_time = NOW()
+    </insert>
+
     <select id="selectUserLastCount" resultType="com.fs.store.vo.FsUserLastCount">
         SELECT
         fs_user_course_count.user_id,