Преглед на файлове

1、调整直播定时任务

yys преди 3 седмици
родител
ревизия
522622ed66
променени са 1 файла, в които са добавени 24 реда и са изтрити 12 реда
  1. 24 12
      fs-live-app/src/main/java/com/fs/live/task/TenantTaskRunner.java

+ 24 - 12
fs-live-app/src/main/java/com/fs/live/task/TenantTaskRunner.java

@@ -37,15 +37,22 @@ public class TenantTaskRunner {
     /** ThreadLocal 防止任务方法递归二次分发 */
     /** ThreadLocal 防止任务方法递归二次分发 */
     private static final ThreadLocal<Boolean> IN_TENANT_EXECUTION = ThreadLocal.withInitial(() -> false);
     private static final ThreadLocal<Boolean> IN_TENANT_EXECUTION = ThreadLocal.withInitial(() -> false);
 
 
-    /** 租户并行执行线程池(守护线程,应用关闭时自动终止) */
-    private final ExecutorService tenantExecutor = Executors.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors(),
-            r -> {
-                Thread t = new Thread(r, "saas-tenant-task");
-                t.setDaemon(true);
-                return t;
-            }
-    );
+    /** 每种任务独立线程池,避免不同类型任务互相阻塞 */
+    private final ConcurrentHashMap<String, ExecutorService> taskExecutors = new ConcurrentHashMap<>();
+
+    /** 获取指定任务的线程池(每个任务类型独立隔离) */
+    private ExecutorService getExecutor(String taskName) {
+        return taskExecutors.computeIfAbsent(taskName, name ->
+                Executors.newFixedThreadPool(
+                        Runtime.getRuntime().availableProcessors(),
+                        r -> {
+                            Thread t = new Thread(r, "saas-tenant-" + name);
+                            t.setDaemon(true);
+                            return t;
+                        }
+                )
+        );
+    }
 
 
     @Resource
     @Resource
     private TenantDataSourceManager tenantDataSourceManager;
     private TenantDataSourceManager tenantDataSourceManager;
@@ -70,19 +77,24 @@ public class TenantTaskRunner {
             return;
             return;
         }
         }
         log.info("[SaaS Live Task] 开始执行任务={}, 有效租户数={}", taskName, tenants.size());
         log.info("[SaaS Live Task] 开始执行任务={}, 有效租户数={}", taskName, tenants.size());
+        ExecutorService executor = getExecutor(taskName);
         // 并行提交所有租户任务,等待全部完成
         // 并行提交所有租户任务,等待全部完成
         List<Future<?>> futures = new java.util.ArrayList<>(tenants.size());
         List<Future<?>> futures = new java.util.ArrayList<>(tenants.size());
         for (TenantInfo tenant : tenants) {
         for (TenantInfo tenant : tenants) {
-            futures.add(tenantExecutor.submit(() -> runForOneTenant(tenant, taskName, action)));
+            futures.add(executor.submit(() -> runForOneTenant(tenant, taskName, action)));
         }
         }
         // 等待全部完成,收集异常
         // 等待全部完成,收集异常
         for (int i = 0; i < futures.size(); i++) {
         for (int i = 0; i < futures.size(); i++) {
             try {
             try {
                 futures.get(i).get(5, TimeUnit.MINUTES);
                 futures.get(i).get(5, TimeUnit.MINUTES);
             } catch (TimeoutException e) {
             } catch (TimeoutException e) {
-                log.error("[SaaS Live Task] 租户 {} 执行任务 {} 超时", tenants.get(i).getTenantCode(), taskName);
+                // 取消超时任务,防止线程池被僵尸任务耗尽
+                futures.get(i).cancel(true);
+                log.error("[SaaS Live Task] 租户 {} 执行任务 {} 超时(5min), 已取消",
+                        tenants.get(i).getTenantCode(), taskName);
             } catch (ExecutionException e) {
             } catch (ExecutionException e) {
-                log.error("[SaaS Live Task] 租户 {} 执行任务 {} 异常", tenants.get(i).getTenantCode(), taskName, e.getCause());
+                log.error("[SaaS Live Task] 租户 {} 执行任务 {} 异常",
+                        tenants.get(i).getTenantCode(), taskName, e.getCause());
             } catch (InterruptedException e) {
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 Thread.currentThread().interrupt();
                 log.warn("[SaaS Live Task] 任务 {} 被中断", taskName);
                 log.warn("[SaaS Live Task] 任务 {} 被中断", taskName);