瀏覽代碼

发送修复线程

吴树波 11 小時之前
父節點
當前提交
0d13ca1b1b

+ 11 - 12
fs-ipad-task/src/main/java/com/fs/app/service/CustomThreadPoolConfig.java

@@ -1,33 +1,32 @@
 package com.fs.app.service;
 
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.util.concurrent.ThreadPoolExecutor;
 
-/**
- * @author MixLiu
- * @date 2025/7/11 上午11:04)
- */
 @Configuration
+@Slf4j
 public class CustomThreadPoolConfig {
     @Bean(name = "customThreadPool", destroyMethod = "shutdown")
     public ThreadPoolTaskExecutor customThreadPool() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        // 核心线程数
         executor.setCorePoolSize(300);
-        // 最大线程数
         executor.setMaxPoolSize(300);
-        // 线程名前缀
+        executor.setQueueCapacity(500);
         executor.setThreadNamePrefix("custom-pool-");
-        // 拒绝策略:直接丢弃新任务
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
-        // 非核心线程空闲存活时间(秒)
+        executor.setRejectedExecutionHandler((r, executor1) -> {
+            log.error("线程池拒绝执行任务!活跃线程数: {}, 队列大小: {}, 队列剩余容量: {}",
+                    executor1.getActiveCount(), executor1.getQueue().size(), executor1.getQueue().remainingCapacity());
+            if (!executor1.isShutdown()) {
+                r.run();
+            }
+        });
         executor.setKeepAliveSeconds(60);
-        // 等待所有任务完成后关闭线程池
         executor.setWaitForTasksToCompleteOnShutdown(true);
-        // 初始化
+        executor.setAwaitTerminationSeconds(60);
         executor.initialize();
         return executor;
     }

+ 8 - 10
fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java

@@ -41,10 +41,7 @@ import org.springframework.util.StringUtils;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 @Component
@@ -139,23 +136,24 @@ public class SendMsg {
         Map<String, FsCoursePlaySourceConfig> miniMap = getMiniMap();
         // 获取 pad 发送的企微
         getQwUserList().forEach(e -> {
-            // 如果没有值就执行后面的方法 并且入值
             qwMap.computeIfAbsent(e.getId(), k -> {
-                // 线程启动
-                CompletableFuture.runAsync(() -> {
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                     try {
                         log.info("开始任务:{}", e.getQwUserName());
-                        // 开始任务
                         processUser(e, delayStart, delayEnd, miniMap);
                     } catch (Exception exception) {
                         log.error("发送错误:", exception);
                     } finally {
                         log.info("删除任务:{}", e.getQwUserName());
                         qwMap.remove(e.getId());
-//                        removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
                     }
                 }, customThreadPool);
-                return System.currentTimeMillis(); // 占位值
+                future.exceptionally(ex -> {
+                    log.error("任务提交异常,userId={}, qwUserName={}", e.getId(), e.getQwUserName(), ex);
+                    qwMap.remove(e.getId());
+                    return null;
+                });
+                return System.currentTimeMillis();
             });
         });
     }