Browse Source

定时获取 通过调用 企业微信接口 发送的 SOP 客户群发消息 的反馈结果(新版)

三七 4 days ago
parent
commit
ec22dfc84f

+ 1 - 1
fs-qw-task/src/main/java/com/fs/app/task/qwTask.java

@@ -182,7 +182,7 @@ public class qwTask {
     }
 
     /**
-     * 定时任务:获取企业微信SOP群发消息反馈结果(新版-按营期发送)
+     *
      * 执行时间:每天上午 8:00:00
      * 功能:获取通过企业微信接口发送的SOP客户群发消息的反馈结果
      */

+ 13 - 0
fs-qw-task/src/main/java/com/fs/framework/config/ThreadPoolConfig.java

@@ -4,8 +4,10 @@ import com.fs.common.utils.Threads;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +35,17 @@ public class ThreadPoolConfig
     // 线程池维护线程所允许的空闲时间
     private int keepAliveSeconds = 300;
 
+
+    @Bean
+    public TaskScheduler taskScheduler(){
+        ThreadPoolTaskScheduler scheduler=new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(18);
+        scheduler.setThreadNamePrefix("scheduled-task-");
+        scheduler.setAwaitTerminationSeconds(60);
+        scheduler.setWaitForTasksToCompleteOnShutdown(true);
+        return scheduler;
+    }
+
     @Bean(name = "threadPoolTaskExecutor")
     public ThreadPoolTaskExecutor threadPoolTaskExecutor()
     {

+ 318 - 77
fs-service/src/main/java/com/fs/sop/service/impl/QwSopLogsServiceImpl.java

@@ -59,6 +59,7 @@ import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -571,135 +572,375 @@ public class QwSopLogsServiceImpl extends ServiceImpl<QwSopLogsMapper, QwSopLogs
     @Override
     public void qwSopLogsResultNew() {
 
+
         logger.info("开始执行企业微信群发消息结果查询任务");
         long startTime = System.currentTimeMillis();
 
         List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSendResult();
         if (qwSopLogsList.isEmpty()) {
-            logger.info("没有需要查询结果的群发消息记录");
-            return;
+            return ;
         }
 
-
+        // 分组处理
         Map<String, List<QwSopLogs>> grouped = qwSopLogsList.stream().collect(
                 Collectors.groupingBy(log -> log.getQwUserid() + "|" + log.getCorpId() + "|" + log.getMsgId())
         );
-        for (Map.Entry<String, List<QwSopLogs>> entry : grouped.entrySet()) {
-            String key = entry.getKey();
-            List<QwSopLogs> corpLogs = entry.getValue();
 
-            String[] keys = key.split("\\|");
-            String qwUserid = keys[0];
-            String corpId = keys[1];
-            String msgID = keys[2];
+        // 线程安全的数据结构
+        List<QwSopLogs> allUpdates = Collections.synchronizedList(new ArrayList<>());
+        Queue<Map.Entry<String, List<QwSopLogs>>> taskQueue = new ConcurrentLinkedQueue<>(grouped.entrySet());
+        Queue<Map.Entry<String, List<QwSopLogs>>> apiFailedTasks = new ConcurrentLinkedQueue<>();
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+
+        AtomicInteger totalGroupsProcessed = new AtomicInteger(0); // 处理的分组数
+        AtomicInteger totalRecordsUpdated = new AtomicInteger(0);  // 更新的记录数
+
+        int batchSize = 300; // 每300个组批量更新一次
+        int maxRetries = 3;   // 最大重试次数
+
+        // 处理所有任务(包括重试)
+        for (int retryCount = 0; retryCount <= maxRetries; retryCount++) {
+            int currentBatchCount = 0;
+
+            while (!taskQueue.isEmpty()) {
+                int currentBatchSize = Math.min(batchSize - currentBatchCount, taskQueue.size());
+                CountDownLatch batchLatch = new CountDownLatch(currentBatchSize);
+
+                // 处理当前批次任务
+                for (int i = 0; i < currentBatchSize; i++) {
+                    Map.Entry<String, List<QwSopLogs>> entry = taskQueue.poll();
+                    executor.submit(() -> {
+                        try {
+                            // 处理单个分组(传入apiFailedTasks)
+                            List<QwSopLogs> updates = processSingleGroup(entry, apiFailedTasks);
+                            if (updates != null) {
+                                synchronized (allUpdates) {
+                                    allUpdates.addAll(updates);
+                                }
+                                // 统计更新记录数
+                                totalRecordsUpdated.addAndGet(updates.size());
+                            }
+
+                            // 统计处理的分组数
+                            totalGroupsProcessed.incrementAndGet();
+                        } finally {
+                            batchLatch.countDown();
+                        }
+                    });
+                }
+
+                // 等待当前批次完成
+                try {
+                    batchLatch.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+
+                currentBatchCount += currentBatchSize;
 
-            QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
-            param.setMsgid(msgID);
-            param.setUserid(qwUserid);
-            param.setLimit(1000);
+                // 每处理1000个组或任务队列空时更新数据库
+                if (currentBatchCount >= batchSize || taskQueue.isEmpty()) {
+                    synchronized (allUpdates) {
+                        if (!allUpdates.isEmpty()) {
+                            batchUpdateDatabase(new ArrayList<>(allUpdates));
+                            logger.info("每处理300个组或任务队列空时更新数据库:"+new ArrayList<>(allUpdates).size());
+                            allUpdates.clear();
+                        }
 
-            fetchAndProcessAllPages(param, corpId, corpLogs, msgID);
+                    }
+                    currentBatchCount = 0;
+                }
+            }
 
+            // 准备下一轮重试
+            if (retryCount < maxRetries) {
+                taskQueue.addAll(apiFailedTasks);
+                apiFailedTasks.clear();
+            }
         }
 
-        long endTime = System.currentTimeMillis();
-        logger.info("企业微信群发消息结果查询任务完成,处理记录总数: {},总耗时: {}ms",
-                qwSopLogsList.size(), (endTime - startTime));
+        // 最终更新剩余数据
+        synchronized (allUpdates) {
+            if (!allUpdates.isEmpty()) {
+                logger.info("最终更新剩余数据:"+new ArrayList<>(allUpdates).size());
+                batchUpdateDatabase(new ArrayList<>(allUpdates));
+            }
+        }
+
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
 
+        long endTime = System.currentTimeMillis();
+        logger.info("成功处理分组数: " + totalGroupsProcessed.get());
+        logger.info("企业微信群发消息结果查询任务完成-更新记录总数:{},总耗时:{} " , totalRecordsUpdated.get(),(endTime - startTime));
     }
 
-    private void fetchAndProcessAllPages(QwGetGroupmsgSendParam param, String corpId, List<QwSopLogs> logs, String msgId) {
+    private List<QwSopLogs> processSingleGroup(Map.Entry<String, List<QwSopLogs>> entry, Queue<Map.Entry<String, List<QwSopLogs>>> apiFailedTasks) {
+
+        String key = entry.getKey();
+        List<QwSopLogs> corpLogs = entry.getValue();
+        String[] keys = key.split("\\|");
+        String qwUserid = keys[0];
+        String corpId = keys[1];
+        String msgID = keys[2];
+
+        QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
+        param.setMsgid(msgID);
+        param.setUserid(qwUserid);
+        param.setLimit(1000);
+
+        List<QwSopLogs> groupUpdates = new ArrayList<>();
         String nextCursor = null;
+        boolean apiSuccess = true;
 
         do {
             param.setCursor(nextCursor);
+            QwGroupmsgSendResult result = fetchWithRetry(param, corpId); // 重试3次
+
+            // API调用失败处理
+            if (result == null || result.getErrCode() != 0) {
+                apiSuccess = false;
+                break;
+            }
+
+            // 处理当前页结果
+            List<QwSopLogs> pageUpdates = processPageResult(result, corpLogs, corpId, msgID);
+            groupUpdates.addAll(pageUpdates);
+            nextCursor = result.getNextCursor();
+
+        } while (nextCursor != null && !nextCursor.isEmpty());
+
+        // API调用失败时返回null,将任务加入失败队列
+        if (!apiSuccess) {
+            apiFailedTasks.offer(entry);
+            return null;
+        }
+
+        return groupUpdates;
+    }
+
+    private QwGroupmsgSendResult fetchWithRetry(QwGetGroupmsgSendParam param, String corpId) {
+        int retryCount = 0;
+        while (retryCount <= 3) {
             QwGroupmsgSendResult result = qwApiService.getGroupmsgSendResult(param, corpId);
 
+            // 请求失败情况
             if (result == null) {
-                logger.error("接口调用失败: {}", param);
-                return;
+                retryCount++;
+                sleepWithJitter(2000);
+                continue;
             }
 
-            if (result.getErrCode() == 45033) {
-                try {
-                    Thread.sleep(2000 + new Random().nextInt(1000));
-                    result = qwApiService.getGroupmsgSendResult(param, corpId);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.error("线程中断", e);
-                    return;
-                }
+            // 成功情况
+            if (result.getErrCode() == 0) {
+                return result;
             }
 
-            if (result.getErrCode() != 0) {
-                logger.error("查询失败: {}, errCode: {}, errMsg: {}", param, result.getErrCode(), result.getErrMsg());
-                return;
+            // 需要重试的错误码
+            if (result.getErrCode() == 45033 || result.getErrCode() == -1) {
+                retryCount++;
+                sleepWithJitter(2000 + new Random().nextInt(1000));
+                continue;
             }
 
-            processPageResult(result, logs, corpId, msgId);
-            nextCursor = result.getNextCursor();
-
-        } while (nextCursor != null && !nextCursor.isEmpty());
+            // 其他错误码不重试
+            return result;
+        }
+        return null; // 超过重试次数
     }
 
+    private void sleepWithJitter(int baseMillis) {
+        try {
+            int sleepTime = baseMillis + (new Random().nextInt(1000));
+            Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
 
-    private void processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
+    private List<QwSopLogs> processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
         Map<String, SendItemResult> sendMap = result.getSendList().stream()
                 .collect(Collectors.toMap(
                         r -> r.getUserId() + "_" + r.getExternalUserId() + "_" + corpId + "_" + msgId,
                         Function.identity(),
-                        (a, b) -> a  // 如果重复,保留第一个
+                        (a, b) -> a
                 ));
 
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
         String now = LocalDateTime.now().format(formatter);
-
-        // 只处理匹配得上的记录
         List<QwSopLogs> matchedLogs = new ArrayList<>();
 
         for (QwSopLogs log : logs) {
             String logKey = log.getQwUserid() + "_" + log.getExternalUserId() + "_" + log.getCorpId() + "_" + msgId;
             SendItemResult matched = sendMap.get(logKey);
-
-            if (matched != null) {
-
-                switch (matched.getStatus()) {
-                    case 0:
-                        log.setSendStatus(5L);
-                        log.setRemark("员工未在规定时间发送");
-                        break;
-                    case 1:
-                        log.setSendStatus(1L);
-                        log.setReceivingStatus(1L);
-                        break;
-                    case 2:
-                        log.setSendType(2);
-                        log.setSendStatus(3L);
-                        log.setRemark("因客户不是好友导致发送失败,补发");
-                        log.setReceivingStatus(0L);
-                        log.setSendTime(now);
-                        log.setSort(30000001);
-                    case 3:
-                        log.setSendType(2);
-                        log.setSendStatus(3L);
-                        log.setRemark("客户已经收到其他群发消息,补发");
-                        log.setReceivingStatus(0L);
-                        log.setSendTime(now);
-                        log.setSort(30000001);
-                        break;
-                    default:
-                        break;
-                }
-
-                matchedLogs.add(log);
+            if (matched == null) continue;
+
+            switch (matched.getStatus()) {
+                case 0:
+                    log.setSendStatus(5L);
+                    log.setRemark("员工未在规定时间发送");
+                    break;
+                case 1:
+                    log.setSendStatus(1L);
+                    log.setReceivingStatus(1L);
+                    break;
+                case 2:
+                    log.setSendType(2);
+                    log.setSendStatus(3L);
+                    log.setRemark("因客户不是好友导致发送失败,补发");
+                    log.setReceivingStatus(0L);
+                    log.setSendTime(now);
+                    log.setSort(30000001);
+                    break;
+                case 3:
+                    log.setSendType(2);
+                    log.setSendStatus(3L);
+                    log.setRemark("客户已经收到其他群发消息,补发");
+                    log.setReceivingStatus(0L);
+                    log.setSendTime(now);
+                    log.setSort(30000001);
+                    break;
             }
+            matchedLogs.add(log);
         }
-
-        if (!matchedLogs.isEmpty()) {
-            batchUpdateDatabase(matchedLogs);
-        }
+        return matchedLogs;
     }
 
+//    @Override
+//    public void qwSopLogsResultNew() {
+//
+//        logger.info("开始执行企业微信群发消息结果查询任务");
+//        long startTime = System.currentTimeMillis();
+//
+//        List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSendResult();
+//        if (qwSopLogsList.isEmpty()) {
+//            logger.info("没有需要查询结果的群发消息记录");
+//            return;
+//        }
+//
+//
+//        Map<String, List<QwSopLogs>> grouped = qwSopLogsList.stream().collect(
+//                Collectors.groupingBy(log -> log.getQwUserid() + "|" + log.getCorpId() + "|" + log.getMsgId())
+//        );
+//        for (Map.Entry<String, List<QwSopLogs>> entry : grouped.entrySet()) {
+//            String key = entry.getKey();
+//            List<QwSopLogs> corpLogs = entry.getValue();
+//
+//            String[] keys = key.split("\\|");
+//            String qwUserid = keys[0];
+//            String corpId = keys[1];
+//            String msgID = keys[2];
+//
+//            QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
+//            param.setMsgid(msgID);
+//            param.setUserid(qwUserid);
+//            param.setLimit(1000);
+//
+//            fetchAndProcessAllPages(param, corpId, corpLogs, msgID);
+//
+//        }
+//
+//        long endTime = System.currentTimeMillis();
+//        logger.info("企业微信群发消息结果查询任务完成,处理记录总数: {},总耗时: {}ms",
+//                qwSopLogsList.size(), (endTime - startTime));
+//
+//    }
+//    private void fetchAndProcessAllPages(QwGetGroupmsgSendParam param, String corpId, List<QwSopLogs> logs, String msgId) {
+//        String nextCursor = null;
+//
+//        do {
+//            param.setCursor(nextCursor);
+//            QwGroupmsgSendResult result = qwApiService.getGroupmsgSendResult(param, corpId);
+//
+//            if (result == null) {
+//                logger.error("接口调用失败: {}", param);
+//                return;
+//            }
+//
+//            if (result.getErrCode() == 45033) {
+//                try {
+//                    Thread.sleep(2000 + new Random().nextInt(1000));
+//                    result = qwApiService.getGroupmsgSendResult(param, corpId);
+//                } catch (InterruptedException e) {
+//                    Thread.currentThread().interrupt();
+//                    logger.error("线程中断", e);
+//                    return;
+//                }
+//            }
+//
+//            if (result.getErrCode() != 0) {
+//                logger.error("查询失败: {}, errCode: {}, errMsg: {}", param, result.getErrCode(), result.getErrMsg());
+//                return;
+//            }
+//
+//            processPageResult(result, logs, corpId, msgId);
+//            nextCursor = result.getNextCursor();
+//
+//        } while (nextCursor != null && !nextCursor.isEmpty());
+//    }
+//    private void processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
+//        Map<String, SendItemResult> sendMap = result.getSendList().stream()
+//                .collect(Collectors.toMap(
+//                        r -> r.getUserId() + "_" + r.getExternalUserId() + "_" + corpId + "_" + msgId,
+//                        Function.identity(),
+//                        (a, b) -> a  // 如果重复,保留第一个
+//                ));
+//
+//        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+//        String now = LocalDateTime.now().format(formatter);
+//
+//        // 只处理匹配得上的记录
+//        List<QwSopLogs> matchedLogs = new ArrayList<>();
+//
+//        for (QwSopLogs log : logs) {
+//            String logKey = log.getQwUserid() + "_" + log.getExternalUserId() + "_" + log.getCorpId() + "_" + msgId;
+//            SendItemResult matched = sendMap.get(logKey);
+//
+//            if (matched != null) {
+//
+//                switch (matched.getStatus()) {
+//                    case 0:
+//                        log.setSendStatus(5L);
+//                        log.setRemark("员工未在规定时间发送");
+//                        break;
+//                    case 1:
+//                        log.setSendStatus(1L);
+//                        log.setReceivingStatus(1L);
+//                        break;
+//                    case 2:
+//                        log.setSendType(2);
+//                        log.setSendStatus(3L);
+//                        log.setRemark("因客户不是好友导致发送失败,补发");
+//                        log.setReceivingStatus(0L);
+//                        log.setSendTime(now);
+//                        log.setSort(30000001);
+//                    case 3:
+//                        log.setSendType(2);
+//                        log.setSendStatus(3L);
+//                        log.setRemark("客户已经收到其他群发消息,补发");
+//                        log.setReceivingStatus(0L);
+//                        log.setSendTime(now);
+//                        log.setSort(30000001);
+//                        break;
+//                    default:
+//                        break;
+//                }
+//
+//                matchedLogs.add(log);
+//            }
+//        }
+//
+//        if (!matchedLogs.isEmpty()) {
+//            batchUpdateDatabase(matchedLogs);
+//        }
+//    }
+
 
     /**
      * 批量更新数据库

+ 1 - 0
fs-user-app/src/main/java/com/fs/app/controller/CourseController.java

@@ -58,6 +58,7 @@ public class CourseController extends  AppBaseController{
             return R.error("操作异常");
         }
     }
+
     @Cacheable(value = "getProductCateByPid", key = "#pid")
     @ApiOperation("获取子分类")
     @GetMapping("/getProductCateByPid")