|
@@ -2,16 +2,21 @@ package com.fs.store.service.impl;
|
|
|
|
|
|
|
|
import com.fs.common.utils.DateUtils;
|
|
import com.fs.common.utils.DateUtils;
|
|
|
import com.fs.course.mapper.FsCourseWatchLogMapper;
|
|
import com.fs.course.mapper.FsCourseWatchLogMapper;
|
|
|
|
|
+import com.fs.his.mapper.FsUserMapper;
|
|
|
|
|
+import com.fs.his.service.impl.FsUserServiceImpl;
|
|
|
import com.fs.store.domain.FsUserCourseCount;
|
|
import com.fs.store.domain.FsUserCourseCount;
|
|
|
import com.fs.store.mapper.FsUserCourseCountMapper;
|
|
import com.fs.store.mapper.FsUserCourseCountMapper;
|
|
|
import com.fs.store.service.IFsUserCourseCountService;
|
|
import com.fs.store.service.IFsUserCourseCountService;
|
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.ibatis.session.ExecutorType;
|
|
import org.apache.ibatis.session.ExecutorType;
|
|
|
import org.apache.ibatis.session.SqlSession;
|
|
import org.apache.ibatis.session.SqlSession;
|
|
|
import org.apache.ibatis.session.SqlSessionFactory;
|
|
import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
|
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -23,6 +28,7 @@ import java.util.stream.Collectors;
|
|
|
* @date 2025-04-02
|
|
* @date 2025-04-02
|
|
|
*/
|
|
*/
|
|
|
@Service
|
|
@Service
|
|
|
|
|
+@Slf4j
|
|
|
public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
|
|
public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
|
|
|
{
|
|
{
|
|
|
@Autowired
|
|
@Autowired
|
|
@@ -115,29 +121,111 @@ public class FsUserCourseCountServiceImpl implements IFsUserCourseCountService
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public void insertFsUserCourseCountTask() {
|
|
public void insertFsUserCourseCountTask() {
|
|
|
- // 1、获取统计结果
|
|
|
|
|
- List<FsUserCourseCount> countResult = fsUserCourseCountMapper.getCountResult();
|
|
|
|
|
|
|
+ // 总处理量-执行中
|
|
|
|
|
+ int totalProcessed = 0;
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
|
+
|
|
|
|
|
+ log.info("开始处理~~~~~~~~~~~~~~~~~");
|
|
|
|
|
+ // 1、分页分批次查询并处理数据
|
|
|
|
|
+ int page = 1;
|
|
|
|
|
+ int pageSize = 1000;
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ List<Long> userIds = fsUserCourseCountMapper.getUsersByPage((page - 1) * pageSize, pageSize);
|
|
|
|
|
+ if (userIds.isEmpty()) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("处理第 {} 页,用户数: {}", page, userIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ // 2、查询当前页用户的统计结果
|
|
|
|
|
+ List<FsUserCourseCount> countResult = Collections.emptyList();
|
|
|
|
|
+ if (!userIds.isEmpty()) {
|
|
|
|
|
+ countResult = fsUserCourseCountMapper.getCountResult(userIds);
|
|
|
|
|
+
|
|
|
|
|
+ // 3、分批插入数据
|
|
|
|
|
+ this.batchInsertOrUpdateNew(countResult);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ totalProcessed += countResult.size();
|
|
|
|
|
+ // 每处理10页记录一次进度
|
|
|
|
|
+ if (page % 10 == 0) {
|
|
|
|
|
+ log.info("处理进度: 第{}页,已处理 {} 条记录", page, totalProcessed);
|
|
|
|
|
+ }
|
|
|
|
|
+ page++;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
|
|
+ log.info("处理完成!总共处理 {} 条记录,总耗时: {} 毫秒", totalProcessed, endTime - startTime);
|
|
|
|
|
+
|
|
|
|
|
+ // 获取统计结果
|
|
|
|
|
+// List<FsUserCourseCount> countResult = fsUserCourseCountMapper.getCountResult();
|
|
|
|
|
|
|
|
// 查询用户-每天的最新的看课状态,和最后的心跳时间
|
|
// 查询用户-每天的最新的看课状态,和最后的心跳时间
|
|
|
- List<FsUserCourseCount> userStatusAndLastWatchDate = fsUserCourseCountMapper.getUserStatusAndLastWatchDate();
|
|
|
|
|
- Map<String, FsUserCourseCount> map = userStatusAndLastWatchDate.stream()
|
|
|
|
|
- .collect(Collectors.toMap(k -> String.format("%s-%s-%s", k.getUserId(),k.getProjectId(), k.getLastDate()), v -> v));
|
|
|
|
|
-
|
|
|
|
|
- for (FsUserCourseCount data : countResult) {
|
|
|
|
|
- String key = String.format("%s-%s-%s", data.getUserId(),data.getProjectId(), data.getLastDate());
|
|
|
|
|
- FsUserCourseCount fsUserCourseCount = map.get(key);
|
|
|
|
|
- if(fsUserCourseCount != null){
|
|
|
|
|
- data.setLastWatchDate(fsUserCourseCount.getLastWatchDate());
|
|
|
|
|
- data.setStatus(fsUserCourseCount.getStatus());
|
|
|
|
|
|
|
+// List<FsUserCourseCount> userStatusAndLastWatchDate = fsUserCourseCountMapper.getUserStatusAndLastWatchDate();
|
|
|
|
|
+// Map<String, FsUserCourseCount> map = userStatusAndLastWatchDate.stream()
|
|
|
|
|
+// .collect(Collectors.toMap(k -> String.format("%s-%s-%s", k.getUserId(),k.getProjectId(), k.getLastDate()), v -> v));
|
|
|
|
|
+
|
|
|
|
|
+// for (FsUserCourseCount data : countResult) {
|
|
|
|
|
+// String key = String.format("%s-%s-%s", data.getUserId(),data.getProjectId(), data.getLastDate());
|
|
|
|
|
+// FsUserCourseCount fsUserCourseCount = map.get(key);
|
|
|
|
|
+// if(fsUserCourseCount != null){
|
|
|
|
|
+// data.setLastWatchDate(fsUserCourseCount.getLastWatchDate());
|
|
|
|
|
+// data.setStatus(fsUserCourseCount.getStatus());
|
|
|
// data.setStopWatchDays(fsUserCourseCount.getStopWatchDays());
|
|
// data.setStopWatchDays(fsUserCourseCount.getStopWatchDays());
|
|
|
- }
|
|
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 分批次插入数据
|
|
|
|
|
+ * @author Caolq
|
|
|
|
|
+ * @param list 数据列表
|
|
|
|
|
+ */
|
|
|
|
|
+ private void batchInsertOrUpdateNew(List<FsUserCourseCount> list){
|
|
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 2、分批插入数据
|
|
|
|
|
- this.batchInsert(countResult);
|
|
|
|
|
|
|
+ // 分批次处理,一次提交500条
|
|
|
|
|
+ int insertBatchSize = 500;
|
|
|
|
|
+ try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
|
|
+ FsUserCourseCountMapper mapper = sqlSession.getMapper(FsUserCourseCountMapper.class);
|
|
|
|
|
+
|
|
|
|
|
+ long totalStartTime = System.currentTimeMillis();
|
|
|
|
|
+ int totalInserted = 0;
|
|
|
|
|
+
|
|
|
|
|
+ // 将数据分割
|
|
|
|
|
+ List<List<FsUserCourseCount>> batches = Lists.partition(list, insertBatchSize);
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < batches.size(); i++) {
|
|
|
|
|
+ List<FsUserCourseCount> batch = batches.get(i);
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 批量插入
|
|
|
|
|
+ mapper.batchInsertOrUpdate(batch);
|
|
|
|
|
+
|
|
|
|
|
+ // 定期提交事务,避免事务过大
|
|
|
|
|
+ if ((i + 1) % 10 == 0) {
|
|
|
|
|
+ sqlSession.commit();
|
|
|
|
|
+ log.debug("已提交第 {} 到 {} 批次", i - 9, i + 1);
|
|
|
|
|
+ }
|
|
|
|
|
+ totalInserted += batch.size();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批次 {} 插入/更新失败,大小:{}", i + 1, batch.size(), e);
|
|
|
|
|
+ sqlSession.rollback();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 提交剩余未提交的数据,避免提交漏
|
|
|
|
|
+ sqlSession.commit();
|
|
|
|
|
|
|
|
|
|
+ long totalEndTime = System.currentTimeMillis();
|
|
|
|
|
+ log.info("当前页批量插入/更新完成,总记录数:{},总耗时:{} 毫秒", totalInserted, totalEndTime - totalStartTime);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批量插入/更新过程中发生错误", e);
|
|
|
|
|
+ throw new RuntimeException("批量插入/更新失败", e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
private void batchInsert(List<FsUserCourseCount> list) {
|
|
private void batchInsert(List<FsUserCourseCount> list) {
|
|
|
// 分批次处理,一次提交500条
|
|
// 分批次处理,一次提交500条
|
|
|
List<List<FsUserCourseCount>> batches = Lists.partition(list, 500);
|
|
List<List<FsUserCourseCount>> batches = Lists.partition(list, 500);
|