yys 1 неделя назад
Родитель
Сommit
17c1420f72

+ 8 - 19
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -15,7 +15,6 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.Calendar;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -80,7 +79,7 @@ public class CourseWatchLogScheduler {
 //    }
 
 
-    /** 检查看课状态:每分钟执行;整5分钟时顺带创建完课消息。SaaS 开启时按租户执行。 */
+    /** 检查看课状态:每分钟执行。SaaS 开启时按租户执行。 */
     @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void checkWatchStatus() {
         if (!isRunning1.compareAndSet(false, true)) {
@@ -104,17 +103,6 @@ public class CourseWatchLogScheduler {
             courseWatchLogService.scheduleBatchUpdateToDatabase();
             courseWatchLogService.checkWatchStatus();
             log.info("检查看课中任务执行完成>>>>>>>>>>>>");
-            Calendar calendar = Calendar.getInstance();
-            int minute = calendar.get(Calendar.MINUTE);
-            if (minute % 5 == 0) {
-                try {
-                    log.info("创建完课消息 - 定时任务开始 {}", System.currentTimeMillis());
-                    sopLogsTaskService.createCourseFinishMsg();
-                    log.info("创建完课消息 - 定时任务成功完成");
-                } catch (Exception e) {
-                    log.error("创建完课消息 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
-                }
-            }
         } catch (Exception e) {
             log.error("检查看课中任务执行完成 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
         }
@@ -129,24 +117,25 @@ public class CourseWatchLogScheduler {
             log.warn("创建完课消息 - 上一个任务尚未完成,跳过此次执行");
             return;
         }
+        long scheduleStart = System.currentTimeMillis();
+        log.info("创建完课消息 - 调度触发 saasTaskEnabled={}", saasTaskEnabled);
         try {
             if (saasTaskEnabled) {
                 tenantTaskRunner.runForEachTenant("createCourseFinishMsg", () -> {
                     try {
-                        log.info("创建完课消息 - 定时任务开始");
                         sopLogsTaskService.createCourseFinishMsg();
-                        log.info("创建完课消息 - 定时任务成功完成");
                     } catch (Exception e) {
-                        log.error("创建完课消息 - 定时任务执行失败", e);
+                        log.error("创建完课消息 - 租户任务执行失败", e);
                     }
                 });
             } else {
-                log.info("创建完课消息 - 定时任务开始");
                 sopLogsTaskService.createCourseFinishMsg();
-                log.info("创建完课消息 - 定时任务成功完成");
             }
+            log.info("创建完课消息 - 调度完成 saasTaskEnabled={}, 耗时={}ms",
+                    saasTaskEnabled, System.currentTimeMillis() - scheduleStart);
         } catch (Exception e) {
-            log.error("创建完课消息 - 定时任务执行失败", e);
+            log.error("创建完课消息 - 调度异常 saasTaskEnabled={}, 耗时={}ms",
+                    saasTaskEnabled, System.currentTimeMillis() - scheduleStart, e);
         } finally {
             isRunning3.set(false);
         }

+ 110 - 81
fs-qw-task/src/main/java/com/fs/app/taskService/impl/SopLogsTaskServiceImpl.java

@@ -2238,98 +2238,109 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     @Override
     public void createCourseFinishMsg() {
         long startTime = System.currentTimeMillis();
-        log.info("创建完课消息 - 定时任务开始 {}", startTime);
+        final Long tenantId = captureTenantId();
+        if (saasTaskEnabled && tenantId == null) {
+            log.warn("创建完课消息 - SaaS 模式缺少 tenantId,跳过执行");
+            return;
+        }
+        log.info("创建完课消息 - 任务开始 tenantId={}, saasTaskEnabled={}", tenantId, saasTaskEnabled);
 
-        // 线程池配置
         int threadPoolSize = 4;
-        ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
-
-        // 用于收集所有处理结果的队列
+        ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize + 1);
         BlockingQueue<List<FsCourseWatchLog>> batchQueue = new LinkedBlockingQueue<>();
+        AtomicInteger totalBatchCount = new AtomicInteger(0);
+        AtomicInteger totalRecordCount = new AtomicInteger(0);
 
         try {
-            // 查询当天日期范围
             LocalDate today = LocalDate.now();
             Date startDate = Date.from(today.atStartOfDay(ZoneId.systemDefault()).toInstant());
             Date endDate = Date.from(today.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant());
+            log.info("创建完课消息 - 查询待处理完课记录 tenantId={}, 日期范围=[{}, {})", tenantId, startDate, endDate);
 
-            // 启动生产者线程 - 流式分批查询数据
-            executorService.submit(() -> {
+            executorService.submit(() -> runInTenantContext(tenantId, "createCourseFinishMsg-producer", () -> {
                 try {
                     int batchSize = 1000;
-                    long  maxId = 0;
+                    long maxId = 0;
                     boolean hasMore = true;
 
                     while (hasMore) {
-                        // 查询当前批次数据
                         List<FsCourseWatchLog> batch = fsCourseWatchLogMapper.selectFsCourseWatchLogFinishBatchByDate(
                                 startDate, endDate, maxId, batchSize);
 
                         if (!batch.isEmpty()) {
-                            // 将批次放入队列
                             batchQueue.put(batch);
-                            // 更新maxId为当前批次的最后一个ID
                             maxId = batch.get(batch.size() - 1).getLogId();
-                            log.debug("已生产批次数据,最后logId: {}, 数量: {}", maxId, batch.size());
+                            int batchNo = totalBatchCount.incrementAndGet();
+                            int recordCount = totalRecordCount.addAndGet(batch.size());
+                            log.info("创建完课消息 - 生产批次 tenantId={}, batchNo={}, size={}, maxLogId={}, 累计记录数={}",
+                                    tenantId, batchNo, batch.size(), maxId, recordCount);
                         }
 
                         if (batch.size() < batchSize) {
                             hasMore = false;
-                            batchQueue.put(Collections.emptyList());// 结束标志
-                            log.info("数据生产完成,最后logId: {}", maxId);
+                            for (int i = 0; i < threadPoolSize; i++) {
+                                batchQueue.put(Collections.emptyList());
+                            }
+                            if (totalRecordCount.get() == 0) {
+                                log.info("创建完课消息 - 无待处理完课记录 tenantId={}, 查询范围=[{}, {})",
+                                        tenantId, startDate, endDate);
+                            } else {
+                                log.info("创建完课消息 - 数据生产完成 tenantId={}, 总批次数={}, 总记录数={}, 最后logId={}",
+                                        tenantId, totalBatchCount.get(), totalRecordCount.get(), maxId);
+                            }
                         }
                     }
                 } catch (Exception e) {
-                    log.error("生产数据时出错", e);
-                    try {
-                        batchQueue.put(Collections.emptyList()); // 确保消费者能退出
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    }
+                    log.error("创建完课消息 - 生产数据失败 tenantId={}", tenantId, e);
+                    signalConsumersToStop(batchQueue, threadPoolSize);
                 }
-            });
+            }));
 
-            // 消费者线程处理数据
             List<Future<?>> futures = new ArrayList<>();
             for (int i = 0; i < threadPoolSize; i++) {
-                futures.add(executorService.submit(() -> {
+                final int consumerIndex = i + 1;
+                futures.add(executorService.submit(() -> runInTenantContext(tenantId, "createCourseFinishMsg-consumer", () -> {
                     try {
+                        log.info("创建完课消息 - 消费者启动 tenantId={}, consumer={}/{}", tenantId, consumerIndex, threadPoolSize);
                         while (true) {
                             List<FsCourseWatchLog> batch = batchQueue.take();
-
-                            // 空列表表示处理结束
                             if (batch.isEmpty()) {
-                                batchQueue.put(Collections.emptyList()); // 传递给其他消费者
+                                log.info("创建完课消息 - 消费者结束 tenantId={}, consumer={}/{}", tenantId, consumerIndex, threadPoolSize);
                                 break;
                             }
-                            log.info("开始处理批次数据");
-                            processBatch(batch); // 处理批次数据
+                            log.info("创建完课消息 - 消费者处理批次 tenantId={}, consumer={}, size={}",
+                                    tenantId, consumerIndex, batch.size());
+                            processBatch(batch, tenantId);
                         }
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
-                        log.error("处理数据时被中断", e);
+                        log.error("创建完课消息 - 消费者被中断 tenantId={}, consumer={}", tenantId, consumerIndex, e);
                     } catch (Exception e) {
-                        log.error("处理数据时出错", e);
+                        log.error("创建完课消息 - 消费者处理失败 tenantId={}, consumer={}", tenantId, consumerIndex, e);
                     }
-                }));
+                })));
             }
 
-            // 等待所有任务完成
             for (Future<?> future : futures) {
                 try {
-                    future.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error("等待任务完成时出错", e);
+                    future.get(30, TimeUnit.MINUTES);
+                } catch (InterruptedException e) {
+                    log.error("创建完课消息 - 等待消费者完成时被中断 tenantId={}", tenantId, e);
                     Thread.currentThread().interrupt();
+                } catch (ExecutionException e) {
+                    log.error("创建完课消息 - 消费者执行异常 tenantId={}", tenantId, e.getCause());
+                } catch (TimeoutException e) {
+                    log.error("创建完课消息 - 处理超时(30分钟) tenantId={}", tenantId, e);
                 }
             }
 
-            log.info("所有批次处理完成,总耗时: {}ms", System.currentTimeMillis() - startTime);
-
+            log.info("创建完课消息 - 任务完成 tenantId={}, 总记录数={}, 耗时={}ms",
+                    tenantId, totalRecordCount.get(), System.currentTimeMillis() - startTime);
         } finally {
             executorService.shutdown();
             try {
                 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                    log.warn("创建完课消息 - 线程池未在60秒内结束,强制关闭 tenantId={}", tenantId);
                     executorService.shutdownNow();
                 }
             } catch (InterruptedException e) {
@@ -2339,83 +2350,101 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
+    private void signalConsumersToStop(BlockingQueue<List<FsCourseWatchLog>> batchQueue, int consumerCount) {
+        log.warn("创建完课消息 - 发送停止信号,consumerCount={}", consumerCount);
+        for (int i = 0; i < consumerCount; i++) {
+            try {
+                batchQueue.put(Collections.emptyList());
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+    }
+
     // 处理单个批次的方法
-    private void processBatch(List<FsCourseWatchLog> batch) {
-        List<FsCourseWatchLog> finishLogsToUpdate = new ArrayList<>();
+    private void processBatch(List<FsCourseWatchLog> batch, Long tenantId) {
+        List<FsCourseWatchLog> finishLogsToMarkSent = new ArrayList<>();
         List<QwSopLogs> sopLogsToInsert = new ArrayList<>();
-        log.info("开始执行处理批次方法-数量:{}",batch.size());
+        int skipExternalNull = 0;
+        int skipTempNull = 0;
+        int skipSopNull = 0;
+        int skipInvalidContact = 0;
+        int skipException = 0;
+
+        log.info("创建完课消息 - 开始处理批次 tenantId={}, size={}", tenantId, batch.size());
         for (FsCourseWatchLog finishLog : batch) {
             try {
-
                 try {
-
                     asyncCourseWatchFinishService.executeCourseWatchFinish(finishLog);
-
-                }catch (Exception e){
-                    log.error("添加完课打备注失败",e);
+                } catch (Exception e) {
+                    log.error("创建完课消息 - 完课打备注失败 tenantId={}, logId={}, externalId={}",
+                            tenantId, finishLog.getLogId(), finishLog.getQwExternalContactId(), e);
                 }
 
-                // 查询外部联系人信息
                 QwExternalContact externalContact = qwExternalContactMapper.selectQwExternalContactById(finishLog.getQwExternalContactId());
                 if (externalContact == null) {
-                    log.error("外部联系人不存在: {}", finishLog.getQwExternalContactId());
+                    skipExternalNull++;
+                    log.warn("创建完课消息 - 外部联系人不存在 tenantId={}, logId={}, externalId={}",
+                            tenantId, finishLog.getLogId(), finishLog.getQwExternalContactId());
                     continue;
                 }
 
-                // 查询完课模板信息
-                FsCourseFinishTemp finishTemp = fsCourseFinishTempMapper.selectFsCourseFinishTempByCompanyId(finishLog.getCompanyUserId(),finishLog.getCompanyId(), finishLog.getVideoId());
-
-                // 设置 finishLog 为已发送状态,并加入批量更新列表
-                finishLog.setSendFinishMsg(1);
-                finishLogsToUpdate.add(finishLog);
-
+                FsCourseFinishTemp finishTemp = fsCourseFinishTempMapper.selectFsCourseFinishTempByCompanyId(
+                        finishLog.getCompanyUserId(), finishLog.getCompanyId(), finishLog.getVideoId());
                 if (finishTemp == null) {
-//                    log.error("完课模板不存在: " + finishLog.getQwUserId() + ", " + finishLog.getVideoId());
+                    skipTempNull++;
+                    log.warn("创建完课消息 - 完课模板不存在 tenantId={}, logId={}, companyUserId={}, companyId={}, videoId={}",
+                            tenantId, finishLog.getLogId(), finishLog.getCompanyUserId(), finishLog.getCompanyId(), finishLog.getVideoId());
                     continue;
                 }
 
-                // 构建 sopLogs 对象
                 QwSopLogs sopLogs = buildSopLogs(finishLog, externalContact, finishTemp);
                 if (sopLogs == null) {
-                    log.error("生成完课发送记录为空-:{}", finishLog.getQwExternalContactId());
+                    skipSopNull++;
+                    log.warn("创建完课消息 - 生成发送记录为空 tenantId={}, logId={}, externalId={}, videoId={}",
+                            tenantId, finishLog.getLogId(), finishLog.getQwExternalContactId(), finishLog.getVideoId());
                     continue;
                 }
 
-                // 如果客户状态有效,则加入批量插入列表
-                if (isValidExternalContact(externalContact)) {
-                    sopLogsToInsert.add(sopLogs);
-                } else {
-                    log.info("完课消息-客户信息有误,不生成完课消息: {}", finishLog.getQwExternalContactId());
+                if (!isValidExternalContact(externalContact)) {
+                    skipInvalidContact++;
+                    log.info("创建完课消息 - 客户状态无效,跳过 tenantId={}, logId={}, externalId={}, status={}",
+                            tenantId, finishLog.getLogId(), finishLog.getQwExternalContactId(), externalContact.getStatus());
+                    continue;
                 }
-//                try {
-//                    fsUserCompanyBindService.finish(externalContact.getFsUserId(), externalContact.getQwUserId(), externalContact.getCompanyUserId(), finishLog);
-//                }catch (Exception e){
-//                    log.error("更新重粉看课状态失败",e);
-//                }
-            } catch (Exception e) {
-                log.error("处理完课记录失败: {}", finishLog.getLogId(), e);
-            }
-        }
 
-        // 批量更新和插入
-        if (!finishLogsToUpdate.isEmpty()) {
-            try {
-                fsCourseWatchLogMapper.batchUpdateWatchLogSendMsg(finishLogsToUpdate);
-                log.info("批量更新 finishLog 成功,数量: {}", finishLogsToUpdate.size());
+                sopLogsToInsert.add(sopLogs);
+                finishLogsToMarkSent.add(finishLog);
+                log.debug("创建完课消息 - 待发送 tenantId={}, logId={}, externalId={}, externalName={}",
+                        tenantId, finishLog.getLogId(), finishLog.getQwExternalContactId(), externalContact.getName());
             } catch (Exception e) {
-                log.error("批量更新 finishLog 失败", e);
+                skipException++;
+                log.error("创建完课消息 - 处理完课记录异常 tenantId={}, logId={}", tenantId, finishLog.getLogId(), e);
             }
         }
 
         if (!sopLogsToInsert.isEmpty()) {
             try {
                 qwSopLogsService.batchInsertQwSopLogs(sopLogsToInsert);
-                log.info("批量插入 sopLogs 成功,数量: {}", sopLogsToInsert.size());
+                for (FsCourseWatchLog finishLog : finishLogsToMarkSent) {
+                    finishLog.setSendFinishMsg(1);
+                }
+                fsCourseWatchLogMapper.batchUpdateWatchLogSendMsg(finishLogsToMarkSent);
+                log.info("创建完课消息 - 批量写入成功 tenantId={}, 成功数={}, logIds={}",
+                        tenantId, sopLogsToInsert.size(),
+                        finishLogsToMarkSent.stream().map(FsCourseWatchLog::getLogId).collect(Collectors.toList()));
             } catch (Exception e) {
-                log.error("批量插入 sopLogs 失败", e);
+                log.error("创建完课消息 - 批量写入失败,send_finish_msg 保持不变以便重试 tenantId={}, 待写入数={}",
+                        tenantId, sopLogsToInsert.size(), e);
             }
+        } else {
+            log.info("创建完课消息 - 本批次无有效消息 tenantId={}, 输入数={}", tenantId, batch.size());
         }
-        log.info("结束处理批次方法-数量:{}",batch.size());
+
+        log.info("创建完课消息 - 批次处理汇总 tenantId={}, 输入={}, 成功={}, 跳过[联系人不存在={}, 模板缺失={}, 消息构建失败={}, 客户状态无效={}, 异常={}]",
+                tenantId, batch.size(), sopLogsToInsert.size(),
+                skipExternalNull, skipTempNull, skipSopNull, skipInvalidContact, skipException);
     }
 
     /**

+ 4 - 4
fs-service/src/main/resources/application-dev.yml

@@ -7,15 +7,15 @@ spring:
     # redis 配置
     redis:
         # 地址
-        host: localhost
-#        host: 192.168.0.245
+#        host: localhost
+        host: 192.168.0.245
         # 端口,默认为6379
         port: 6379
         # 数据库索引
         database: 0
         # 密码
-        password:
-#        password: Ylrztek250218!3@.
+#        password:
+        password: Ylrztek250218!3@.
         # 连接超时时间
         timeout: 20s
         lettuce: