zyp 18 timmar sedan
förälder
incheckning
f6e67bfb36

+ 34 - 18
fs-qw-task/src/main/java/com/fs/app/task/qwTask.java

@@ -28,7 +28,7 @@ import java.util.List;
 /**
  * 企业微信SOP定时任务管理类
  * 负责处理各种定时任务,包括SOP规则检查、消息发送、数据清理等
- * 
+ *
  * @author 系统
  * @version 1.0
  */
@@ -50,10 +50,10 @@ public class qwTask {
 
     @Autowired
     private ISopUserLogsService sopUserLogsService;
-    
+
     @Autowired
     private SopLogsTaskService sopLogsTaskService;
-    
+
     @Autowired
     private SopWxLogsService sopWxLogsService;
 
@@ -71,7 +71,7 @@ public class qwTask {
 
     @Autowired
     private QwSopLogsMapper qwSopLogsMapper;
-    
+
     @Autowired
     private IQwSopTagService qwSopTagService;
 
@@ -99,7 +99,7 @@ public class qwTask {
      * 定时任务:根据营期生成sopLogs待发记录
      * 执行时间:每小时的第5分钟执行
      * 功能:根据营期时间生成需要发送的SOP日志记录
-     * 
+     *
      * @throws Exception 执行异常
      */
     @Scheduled(cron = "0 5 * * * ?") // 每小时的第5分钟触发
@@ -118,7 +118,7 @@ public class qwTask {
      * 定时任务:微信SOP处理
      * 执行时间:每小时的第5分钟执行
      * 功能:处理微信相关的SOP日志
-     * 
+     *
      * @throws Exception 执行异常
      */
     @Scheduled(cron = "0 5 * * * ?") // 每小时的第5分钟触发
@@ -136,7 +136,7 @@ public class qwTask {
      * 定时任务:处理聊天SOP用户日志
      * 执行时间:已注释,原为每分钟的第5秒执行
      * 功能:将clickHouse的sopUserLogsChat(营期表)按每分钟巡回处理
-     * 
+     *
      * @throws Exception 执行异常
      */
 //    @Scheduled(cron = "5 0/1 * * * ?")
@@ -149,20 +149,36 @@ public class qwTask {
     }
 
     /**
-     * 定时任务:发送企业微信SOP群发消息(新版-按营期发送)
-     * 执行时间:每天凌晨 0:20:00
-     * 功能:通过调用企业微信接口发送SOP群发消息
+     * 定时 发送 通过调用 企业微信接口 发送的 SOP 群发消息(按单链发)
      */
-    @Scheduled(cron = "0 20 0 * * ?")
-    public void SendQwApiSopLogTimerNew() {
-        log.info("zyp \n【企微官方接口群发开始】");
-        
-        // 获取当前日期
+    @Scheduled(cron = "0 20 1 * * ?")
+    public void SendQwApiSopLogTimer(){
+        log.info("zyp \n【企微官方接口群发开始-单链】");
+//        qwSopLogsService.checkQwSopLogs();
         LocalDate localDate = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0).toLocalDate();
         String date = localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
 
-        // 根据用户日志创建企业群发
-        qwSopLogsService.createCorpMassSendingByUserLogs(date);
+        qwSopLogsService.createCorpMassSending(date);
+    }
+
+    /**
+     * 定时 发送 通过调用 企业微信接口 发送的 SOP 群发消息(新版-安装营期发)
+     */
+    @Scheduled(cron = "0 10 0,1 * * ?")
+    public void SendQwApiSopLogTimerNew(){
+
+        log.info("zyp \n【企微官方接口群发开始】");
+//        qwSopLogsService.checkQwSopLogs();
+//        LocalDate localDate = LocalDateTime.now().withMinute(0).withSecond(0).withNano(0).toLocalDate();
+//        String date = localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+
+        int currentHour = LocalDateTime.now().getHour();
+        String taskStartTime = LocalDate.now().atTime(currentHour, 0, 0)
+                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+        String taskEndTime = LocalDate.now().atTime(currentHour, 59, 59)
+                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+
+        qwSopLogsService.createCorpMassSendingByUserLogs(taskStartTime,taskEndTime);
     }
 
     /**
@@ -246,7 +262,7 @@ public class qwTask {
 
     /**
      * 批量处理插入逻辑,支持每500条数据一次的批量插入
-     * 
+     *
      * @param logsByJsApiNotExtId 需要处理的日志列表
      */
     private void processAndInsertQwSopLogs(List<QwSopLogsDoSendListTVO> logsByJsApiNotExtId) {

+ 4 - 1
fs-service/src/main/java/com/fs/sop/mapper/QwSopLogsMapper.java

@@ -197,7 +197,10 @@ public interface QwSopLogsMapper extends BaseMapper<QwSopLogs> {
 
 
     @DataSource(DataSourceType.SOP)
-    public List<QwSopLogs> selectSopLogsByCreateCorpMassSending(@Param("date") String date);
+    public List<QwSopLogs> createCorpMassSending(@Param("date") String date);
+
+    @DataSource(DataSourceType.SOP)
+    public List<QwSopLogs> selectSopLogsByCreateCorpMassSending(@Param("taskStartTime") String taskStartTime,@Param("taskEndTime") String taskEndTime);
 
     /**
     * 为了避免一直轮询无效(发送不了,给不了反馈的)数据,只查与定时的时间过后,之前3天内的的数据反馈

+ 1 - 1
fs-service/src/main/java/com/fs/sop/service/IQwSopLogsService.java

@@ -95,7 +95,7 @@ public interface IQwSopLogsService
     /**
      *  创建企业群发(按照营期发)
      */
-    public void createCorpMassSendingByUserLogs(String date);
+    public void createCorpMassSendingByUserLogs(String taskStartTime,String taskEndTime);
 
     /**
      *  检索执行符合条件的定时任务的结果回调(企业微信)

+ 133 - 128
fs-service/src/main/java/com/fs/sop/service/impl/QwSopLogsServiceImpl.java

@@ -1326,173 +1326,178 @@ public class QwSopLogsServiceImpl implements IQwSopLogsService
      * 该方法负责处理企业微信的群发消息创建和发送
      */
     @Override
-    public void createCorpMassSending(String date) {
+    public void createCorpMassSending(String date)  {
+
         long startTime = System.currentTimeMillis();
         logger.info("开始执行企业微信群发消息创建任务");
 
-        String json = configService.selectConfigByKey("course.config");
-        CourseConfig config = JSON.parseObject(json, CourseConfig.class);
-
-        if (config == null) {
-            logger.error("课程默认配置为空,不执行");
-            return;
-        }
-        // 获取需要发送的SOP日志记录
-        List<QwSopLogs> qwSopLogs = qwSopLogsMapper.selectSopLogsByCreateCorpMassSending(date);
-//        List<QwSopLogs> qwSopLogs = qwSopLogsMapper.checkQwSopLogs();
+        List<QwSopLogs> qwSopLogs = qwSopLogsMapper.createCorpMassSending(date);
         if (qwSopLogs.isEmpty()) {
             logger.error("zyp \n【企微官方群发记录为空】");
             return;
         }
 
-        // 按照企业员工ID、发送时间、SOP ID和企业ID进行分组
         Map<String, List<QwSopLogs>> groupedLogs = new HashMap<>();
         for (QwSopLogs log : qwSopLogs) {
             String key = log.getQwUserid() + "|" + log.getSendTime() + "|" + log.getSopId() + "|" + log.getCorpId();
             groupedLogs.computeIfAbsent(key, k -> new ArrayList<>()).add(log);
         }
 
-        // 创建线程池,使用固定大小的线程池以避免过多线程导致的资源竞争
         int threadCount = Math.min(10, Runtime.getRuntime().availableProcessors() + 1);
         ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
-
-        // 创建用于发送消息的嵌套线程池
         ExecutorService messageExecutorService = Executors.newFixedThreadPool(20);
-
-        // 用于存储需要批量更新的日志记录,使用线程安全的集合
         List<QwSopLogs> updateList = Collections.synchronizedList(new ArrayList<>());
 
-        // 使用CountDownLatch等待所有任务完成
         CountDownLatch latch = new CountDownLatch(groupedLogs.size());
 
-        // 处理每个分组
-        for (Map.Entry<String, List<QwSopLogs>> entry : groupedLogs.entrySet()) {
-            String key = entry.getKey();
-            List<QwSopLogs> logs = entry.getValue();
-            String[] keys = key.split("\\|");
-            String qwUserid = keys[0];
-            String corpId = keys[3];
-
-//            QwUser qwUser = qwUserMapper.selectQwUserByCorpIdAndUserId(corpId, qwUserid);
-            // 查询员工信息的id
-            QwUser qwUser = qwExternalContactService.getQwUserByRedis(corpId.trim(),qwUserid.trim());
-            if (qwUser != null && qwUser.getIsDel() == 0) {
-                // 提交到线程池处理每个分组
-                executorService.submit(() -> {
-                    try {
-                        // 按外部用户ID分组
-                        Map<String, List<QwSopLogs>> userLogsMap = new HashMap<>();
-                        for (QwSopLogs log : logs) {
-                            String externalUserId = log.getExternalUserId();
-                            userLogsMap.computeIfAbsent(externalUserId, k -> new ArrayList<>()).add(log);
-                        }
-
-                        // 使用嵌套的CountDownLatch等待所有消息发送完成
-                        CountDownLatch messageLatch = new CountDownLatch(userLogsMap.size());
-
-                        // 处理每个外部用户
-                        for (Map.Entry<String, List<QwSopLogs>> userEntry : userLogsMap.entrySet()) {
-                            String externalUserId = userEntry.getKey();
-                            List<QwSopLogs> userLogs = userEntry.getValue();
+        try {
+            for (Map.Entry<String, List<QwSopLogs>> entry : groupedLogs.entrySet()) {
+                String key = entry.getKey();
+                List<QwSopLogs> logs = entry.getValue();
+                String[] keys = key.split("\\|");
+                String qwUserid = keys[0];
+                String corpId = keys[3];
+
+                QwUser qwUser = qwExternalContactService.getQwUserByRedis(corpId.trim(), qwUserid.trim());
+                if (qwUser != null && qwUser.getIsDel() == 0) {
+                    executorService.submit(() -> {
+                        try {
+                            Map<String, List<QwSopLogs>> userLogsMap = new HashMap<>();
+                            for (QwSopLogs log : logs) {
+                                userLogsMap.computeIfAbsent(log.getExternalUserId(), k -> new ArrayList<>()).add(log);
+                            }
 
-                            // 提交到消息发送线程池
-                            messageExecutorService.submit(() -> {
-                                try {
-                                    QwMsgTemplateSop templateSop = new QwMsgTemplateSop();
-                                    templateSop.setChatType("single");
-                                    templateSop.setAllowSelect(false);
-                                    templateSop.setSender(qwUserid);
-                                    templateSop.setExternalUseridList(Collections.singletonList(externalUserId));
-
-                                    List<QwMsgTemplateSop.Attachment> attachments = new ArrayList<>();
-                                    boolean hasError = false;
-
-                                    for (QwSopLogs log : userLogs) {
-                                        try {
-                                            QwSopTempSetting.Content content = JSON.parseObject(log.getContentJson(), QwSopTempSetting.Content.class);
-                                            if (content == null || content.getSetting() == null) continue;
-                                            Long courseId = content.getCourseId();
-                                            for (QwSopTempSetting.Content.Setting set : content.getSetting()) {
-                                                processContent(set, corpId, templateSop, attachments, courseId);
+                            CountDownLatch messageLatch = new CountDownLatch(userLogsMap.size());
+                            for (Map.Entry<String, List<QwSopLogs>> userEntry : userLogsMap.entrySet()) {
+                                String externalUserId = userEntry.getKey();
+                                List<QwSopLogs> userLogs = userEntry.getValue();
+
+                                messageExecutorService.submit(() -> {
+                                    try {
+                                        QwMsgTemplateSop templateSop = new QwMsgTemplateSop();
+                                        templateSop.setChatType("single");
+                                        templateSop.setAllowSelect(false);
+                                        templateSop.setSender(qwUserid);
+                                        templateSop.setExternalUseridList(Collections.singletonList(externalUserId));
+
+                                        List<QwMsgTemplateSop.Attachment> attachments = new ArrayList<>();
+                                        boolean hasError = false;
+                                        for (QwSopLogs log : userLogs) {
+                                            try {
+                                                QwSopTempSetting.Content content = JSON.parseObject(log.getContentJson(), QwSopTempSetting.Content.class);
+                                                if (content == null || content.getSetting() == null) continue;
+                                                Long courseId = content.getCourseId();
+                                                for (QwSopTempSetting.Content.Setting set : content.getSetting()) {
+                                                    processContent(set, corpId, templateSop, attachments, courseId);
+                                                }
+                                            } catch (Exception e) {
+                                                logger.error("消息内容解析失败,logId:{},{},{}", log.getId(), e,key);
+                                                hasError = true;
                                             }
-                                        } catch (Exception e) {
-                                            logger.error("消息内容解析失败,logId:{}", log.getId(), e);
-                                            hasError = true;
                                         }
-                                    }
+                                        if (!hasError && (!attachments.isEmpty() || templateSop.getTextContent() != null)) {
+                                            templateSop.setAttachments(attachments);
+                                            try {
+                                                QwAddMsgTemplateResult result = qwApiService.addMsgTemplateBySop(templateSop, corpId);
+                                                if (result.getErrCode() == 0 || result.getErrCode() == 41063){
+                                                    for (QwSopLogs log : userLogs) {
+                                                        log.setSendStatus(1L);
+                                                        log.setMsgId(result.getMsgId());
+                                                        updateList.add(log);
+                                                    }
+                                                }else {
+
+                                                    for (QwSopLogs log : userLogs) {
+                                                        log.setSendType(2);
+                                                        log.setSendStatus(3L);
+                                                        log.setRemark("官方有误,sop补发");
+                                                        log.setReceivingStatus(0L);
+                                                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+                                                        LocalDateTime currentTime = LocalDateTime.now();
+                                                        String newTimeString = currentTime.format(formatter);
+                                                        log.setSendTime(newTimeString);
+                                                        log.setSort(30000001);
+                                                        updateList.add(log);
+                                                    }
 
-                                    if (!hasError && (!attachments.isEmpty() || templateSop.getTextContent() != null)) {
-                                        templateSop.setAttachments(attachments);
-                                        try {
-                                            QwAddMsgTemplateResult result = qwApiService.addMsgTemplateBySop(templateSop, corpId);
-                                            for (QwSopLogs log : userLogs) {
-                                                log.setSendStatus(result.getErrCode() == 0 || result.getErrCode() == 41063 ? 1L : 0L);
-                                                log.setMsgId(result.getMsgId());
-                                                updateList.add(log);
-                                            }
-                                            if (result.getErrCode() != 0 && result.getErrCode() != 41063){
-                                                logger.error("企业微信接口-消息发送失败,corpId:{},errCode:{},errMsg:{}", corpId, result.getErrCode(), result.getErrMsg());
-                                            }
-                                        } catch (Exception e) {
-                                            logger.error("消息发送失败,user:{}", externalUserId, e);
-                                            for (QwSopLogs log : userLogs) {
-                                                log.setSendStatus(0L);
-                                                updateList.add(log);
+                                                    logger.error("企业微信接口-消息发送失败-进入sop补偿,corpId:{},errCode:{},errMsg:{},key:{}", corpId, result.getErrCode(), result.getErrMsg(),key);
+                                                }
+                                            } catch (Exception e) {
+                                                logger.error("消息发送失败,user:{},{},{}", externalUserId, e,key);
+                                                for (QwSopLogs log : userLogs) {
+                                                    log.setSendStatus(0L);
+                                                    log.setRemark("信息异常");
+                                                    updateList.add(log);
+                                                }
                                             }
                                         }
+
+                                    } finally {
+                                        logger.info("执行结束-messageLatch-countDown:"+updateList.size());
+                                        messageLatch.countDown();
                                     }
-                                } finally {
-                                    messageLatch.countDown();
-                                }
-                            });
-                        }
+                                });
 
-                        // 等待所有消息发送完成
-                        try {
+                            }
+
+                            logger.info("messageExecutorService-updateList总量:"+updateList.size());
+
+                            // 等待所有消息发送完成
                             messageLatch.await();
+
                         } catch (InterruptedException e) {
-                            logger.error("等待消息发送完成时被中断", e);
+                            logger.info("messageExecutorService-Thread.currentThread().interrupt():"+updateList.size());
                             Thread.currentThread().interrupt();
+                        } finally {
+                            logger.info("finally-latch.countDown:"+updateList.size());
+                            latch.countDown();
                         }
-                    } finally {
-                        latch.countDown();
-                    }
-                });
-            }else {
-                logger.error("官方群发 员工信息有误:"+corpId+":"+qwUserid);
+                    });
+                } else {
+                    logger.error("员工信息无效-不存在或被删除,corpId:{}, userId:{}", corpId, qwUserid);
+
+                    latch.countDown(); // 确保每个分组都减少计数
+                }
             }
 
-        }
+            latch.await(); // 等待所有分组提交的任务完成
+            logger.info("关闭线程池并等待任务完成:"+updateList.size());
+            // 关闭线程池并等待任务完成
+            executorService.shutdown();
+            messageExecutorService.shutdown();
+            if (!executorService.awaitTermination(300, TimeUnit.SECONDS)) {
+                logger.error("ExecutorService未完全关闭");
+            }
+            if (!messageExecutorService.awaitTermination(300, TimeUnit.SECONDS)) {
+                logger.error("MessageExecutorService未完全关闭");
+            }
 
-        // 等待所有分组处理完成
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            logger.error("等待分组处理完成时被中断", e);
-            Thread.currentThread().interrupt();
-        }
+            // 5. 同步块生成快照(终极防护),创建快照避免并发修改
+            List<QwSopLogs> batchList;
+            synchronized (updateList) { // 加锁确保无并发修改
+                batchList = new ArrayList<>(updateList);
+            }
 
-        // 批量更新发送状态,每500条一批
-        if (!updateList.isEmpty()) {
-            int batchSize = 500;
-            for (int i = 0; i < updateList.size(); i += batchSize) {
-                int endIndex = Math.min(i + batchSize, updateList.size());
-                List<QwSopLogs> batch = updateList.subList(i, endIndex);
-                try {
-                    qwSopLogsMapper.batchUpdateStatus(batch);
-                    logger.info("批量修改 sopLogs 成功,修改数量: " + batch.size());
-                } catch (Exception e) {
-                    logger.error("批量修改 sopLogs 失败", e);
+
+            logger.info("批量修改总数: {}", batchList.size());
+
+            if (!batchList.isEmpty()){
+                int batchSize = 1000;
+                for (int i = 0; i < batchList.size(); i += batchSize) {
+                    int end = Math.min(i + batchSize, batchList.size());
+                    List<QwSopLogs> subList = batchList.subList(i, end);
+                    qwSopLogsMapper.batchUpdateStatus(subList);
                 }
             }
-        }
 
-        // 关闭线程池
-        executorService.shutdown();
-        messageExecutorService.shutdown();
 
-        long endTime = System.currentTimeMillis();
-        logger.info("企业微信群发消息创建任务执行完成,总耗时: {} 毫秒", (endTime - startTime));
+        } catch (InterruptedException e) {
+            logger.error("线程中断异常", e);
+            Thread.currentThread().interrupt();
+        } finally {
+            long endTime = System.currentTimeMillis();
+            logger.info("任务完成,耗时: {} 毫秒", endTime - startTime);
+        }
     }
 
     // 处理不同类型的内容
@@ -1593,12 +1598,12 @@ public class QwSopLogsServiceImpl implements IQwSopLogsService
 //    }
 
     @Override
-    public void createCorpMassSendingByUserLogs(String date) {
+    public void createCorpMassSendingByUserLogs(String taskStartTime,String taskEndTime) {
 
         long startTime = System.currentTimeMillis();
         logger.info("开始执行企业微信群发消息创建任务");
 
-        List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSending(date);
+        List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSending(taskStartTime,taskEndTime);
         if (qwSopLogsList.isEmpty()) {
             logger.error("zyp \n【企微官方群发记录为空】");
             return;

+ 18 - 1
fs-service/src/main/resources/mapper/sop/QwSopLogsMapper.xml

@@ -328,9 +328,26 @@
     </select>
 
 
+    <select id="createCorpMassSending" parameterType="String" resultType="QwSopLogs" >
+        <![CDATA[
+        select * from qw_sop_logs
+        where log_type=2
+          and send_status=3
+          and send_type=1
+          and send_time >= #{date}
+          AND take_records = 1
+        ]]>
+    </select>
+
     <select id="selectSopLogsByCreateCorpMassSending" parameterType="String" resultType="QwSopLogs" >
         <![CDATA[
-            select * from qw_sop_logs where log_type=2 and send_status=3 and send_type=1 and send_time >= #{date}
+        select * from qw_sop_logs
+        where log_type=2
+          and send_status=3
+          and send_type=1
+          AND send_time >= #{taskStartTime}
+          AND send_time <= #{taskEndTime}
+          AND take_records = 0
         ]]>
     </select>