Jelajahi Sumber

update 群发助手生成执行记录

ct 1 Minggu lalu
induk
melakukan
bb350ead0e

+ 10 - 3
fs-qw-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -4,6 +4,7 @@ package com.fs.app.controller;
 import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
+import com.fs.app.task.TenantTaskRunner;
 import com.fs.app.task.qwTask;
 import com.fs.app.taskService.*;
 import com.fs.common.config.RedisTenantContext;
@@ -170,6 +171,8 @@ public class CommonController {
     private IQwCompanyService qwCompanyService;
     @Autowired
     private TenantDataSourceManager tenantDataSourceManager;
+    @Autowired
+    private TenantTaskRunner tenantTaskRunner;
 
     @RequestMapping("/syncQwUserAsync")
     public void syncQwUserAsync(String corpId) {
@@ -449,7 +452,7 @@ public class CommonController {
     }
 
     @GetMapping("/test")
-    public R test(String time, String sopId) throws Exception {
+    public R test(Long tenantId, String time, String sopId) throws Exception {
         log.info("进入sop任务");
 //        LocalDateTime currentTime = DateUtil.parseLocalDateTime(time);
 //        // 计算下一个整点时间
@@ -459,10 +462,14 @@ public class CommonController {
 //        log.info("任务实际执行时间: {}", currentTime);
 //        log.info("传递给任务的时间参数: {}", nextHourTime);
         List<String> sopidList = new ArrayList<>();
-        if(StringUtils.isNotEmpty(sopId)){
+        if (StringUtils.isNotEmpty(sopId)) {
             sopidList = Arrays.asList(sopId.split(","));
         }
-        sopLogsTaskService.selectSopUserLogsListByTime(DateUtil.parseLocalDateTime(time), sopidList);
+        List<String> finalSopidList = sopidList;
+        tenantTaskRunner.runForTenantById(tenantId, "test-selectSopUserLogsListByTime", () -> {
+            sopLogsTaskService.selectSopUserLogsListByTime(DateUtil.parseLocalDateTime(time), finalSopidList);
+            return null;
+        });
         return R.ok();
     }
     @GetMapping("/testWx")

+ 44 - 10
fs-qw-task/src/main/java/com/fs/app/task/TenantTaskRunner.java

@@ -125,6 +125,37 @@ public class TenantTaskRunner {
         runForEachTenant(taskName, t -> action.run());
     }
 
+    /**
+     * 按指定租户 ID 执行一次任务(切换租户库、Redis、项目配置)。
+     * 供手动测试接口等场景使用;执行异常会向上抛出。
+     */
+    public void runForTenantById(Long tenantId, String taskName, Callable<Void> action) throws Exception {
+        TenantInfo tenant = loadTenantByIdFromMaster(tenantId);
+        if (tenant == null) {
+            throw new IllegalArgumentException("租户不存在: tenantId=" + tenantId);
+        }
+        if (!Integer.valueOf(1).equals(tenant.getStatus())) {
+            throw new IllegalArgumentException("租户已禁用: tenantId=" + tenantId);
+        }
+        Date now = new Date();
+        if (tenant.getExpireTime() != null && tenant.getExpireTime().before(now)) {
+            throw new IllegalArgumentException("租户已过期: tenantId=" + tenantId);
+        }
+        runForOneTenantThrowing(tenant, taskName, () -> action.call());
+    }
+
+    private TenantInfo loadTenantByIdFromMaster(Long tenantId) {
+        if (tenantId == null) {
+            return null;
+        }
+        try {
+            DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
+            return tenantInfoService.getById(tenantId);
+        } finally {
+            DynamicDataSourceContextHolder.clearDataSourceType();
+        }
+    }
+
     private List<TenantInfo> getValidTenants() {
         try {
             DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
@@ -172,11 +203,21 @@ public class TenantTaskRunner {
     }
 
     private void runForOneTenant(TenantInfo tenant, String taskName, Consumer<TenantInfo> action) {
+        try {
+            runForOneTenantThrowing(tenant, taskName, () -> {
+                action.accept(tenant);
+                return null;
+            });
+        } catch (Exception e) {
+            log.error("[SaaS Task] 租户 tenantId={}, tenantCode={} 执行异常, task={}",
+                    tenant.getId(), tenant.getTenantCode(), taskName, e);
+        }
+    }
+
+    private void runForOneTenantThrowing(TenantInfo tenant, String taskName, Callable<Void> action) throws Exception {
         String dataSourceKey = "tenant:" + tenant.getId();
         try {
-            // 切换到租户数据源
             tenantDataSourceManager.switchTenant(tenant);
-            // 切换 Redis 租户上下文(与 user-app 写入 Redis 时 TenantKeyRedisSerializer 行为一致)
             RedisTenantContext.setTenantId(tenant.getId());
             SecurityContextHolder.getContext().setAuthentication(
                     new UsernamePasswordAuthenticationToken(
@@ -186,7 +227,6 @@ public class TenantTaskRunner {
             log.info("[SaaS Task] 定时任务切换数据源和Redis dataSource={}, tenantId={}, tenantCode={}, task={}",
                     dataSourceKey, tenant.getId(), tenant.getTenantCode(), taskName != null ? taskName : "");
 
-            // 加载租户项目配置
             SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
             if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
                 TenantConfigContext.set(JSONObject.parseObject(cfg.getConfigValue()));
@@ -195,16 +235,10 @@ public class TenantTaskRunner {
             }
             ProjectConfig.loadTenantConfigsFromContext();
 
-            // 执行租户任务
-            action.accept(tenant);
-
-        } catch (Exception e) {
-            log.error("[SaaS Task] 租户 tenantId={}, tenantCode={} 执行异常, task={}",
-                    tenant.getId(), tenant.getTenantCode(), taskName, e);
+            action.call();
         } finally {
             ProjectConfig.clearTenantConfigs();
             TenantConfigContext.clear();
-            // 清理 Redis / Security 租户上下文
             RedisTenantContext.clear();
             SecurityContextHolder.clearContext();
             DynamicDataSourceContextHolder.clearDataSourceType();

+ 176 - 91
fs-qw-task/src/main/java/com/fs/app/taskService/impl/SopLogsTaskServiceImpl.java

@@ -7,7 +7,9 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.fs.app.task.TenantTaskRunner;
 import com.fs.app.taskService.SopLogsTaskService;
 import com.fs.common.config.FSSysConfig;
+import com.fs.common.config.RedisTenantContext;
 import com.fs.common.utils.PubFun;
+import com.fs.common.utils.SecurityUtils;
 import com.fs.common.utils.StringUtils;
 import com.fs.company.domain.Company;
 import com.fs.company.domain.CompanyMiniapp;
@@ -148,12 +150,25 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     @Autowired
     private CloudHostProper cloudHostProper;
 
+    /** SOP 异步任务 / 队列入队时携带租户 ID,消费线程按租户切库 */
+    private static final ThreadLocal<Long> SOP_TASK_TENANT_ID = new ThreadLocal<>();
+
+    private static final class TenantQueueItem<T> {
+        private final Long tenantId;
+        private final T data;
+
+        TenantQueueItem(Long tenantId, T data) {
+            this.tenantId = tenantId;
+            this.data = data;
+        }
+    }
+
     // Blocking queues with bounded capacity to implement backpressure
-    private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
-    private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
-    private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
-    private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
-    private final BlockingQueue<LiveWatchLog> zmLiveWatchQueue = new LinkedBlockingQueue<>(20000);
+    private final BlockingQueue<TenantQueueItem<QwSopLogs>> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
+    private final BlockingQueue<TenantQueueItem<FsCourseWatchLog>> watchLogsQueue = new LinkedBlockingQueue<>(20000);
+    private final BlockingQueue<TenantQueueItem<FsCourseLink>> linkQueue = new LinkedBlockingQueue<>(20000);
+    private final BlockingQueue<TenantQueueItem<FsCourseSopAppLink>> sopAppLinks = new LinkedBlockingQueue<>(20000);
+    private final BlockingQueue<TenantQueueItem<LiveWatchLog>> zmLiveWatchQueue = new LinkedBlockingQueue<>(20000);
 
     // Executors for consumer threads
     private ExecutorService qwSopLogsExecutor;
@@ -189,7 +204,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
 
     @Resource
     private TenantTaskRunner tenantTaskRunner;
-    @Value("${saas.task.enabled:false}")
+    @Value("${saas.task.enabled:true}")
     private boolean saasTaskEnabled;
 
 
@@ -321,10 +336,90 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
+    private Long captureTenantId() {
+        Long tenantId = RedisTenantContext.getTenantId();
+        if (tenantId == null) {
+            try {
+                tenantId = SecurityUtils.getTenantId();
+            } catch (Exception ignored) {
+                tenantId = null;
+            }
+        }
+        return tenantId;
+    }
+
+    private Long resolveEnqueueTenantId() {
+        Long tenantId = SOP_TASK_TENANT_ID.get();
+        if (tenantId == null) {
+            tenantId = captureTenantId();
+        }
+        return tenantId;
+    }
+
+    /**
+     * 在租户数据源 + Redis 上下文中执行(供 @Async 子线程使用)。
+     */
+    private void runInTenantContext(Long tenantId, String taskName, Runnable action) {
+        if (saasTaskEnabled && tenantId != null) {
+            try {
+                tenantTaskRunner.runForTenantById(tenantId, taskName, () -> {
+                    SOP_TASK_TENANT_ID.set(tenantId);
+                    try {
+                        action.run();
+                        return null;
+                    } finally {
+                        SOP_TASK_TENANT_ID.remove();
+                    }
+                });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            SOP_TASK_TENANT_ID.set(tenantId);
+            try {
+                action.run();
+            } finally {
+                SOP_TASK_TENANT_ID.remove();
+            }
+        }
+    }
+
+    private <T> void flushBatchByTenant(List<TenantQueueItem<T>> items, String taskName,
+                                        java.util.function.Consumer<List<T>> batchInserter) {
+        if (items.isEmpty()) {
+            return;
+        }
+        Map<Long, List<T>> grouped = new LinkedHashMap<>();
+        for (TenantQueueItem<T> item : items) {
+            if (saasTaskEnabled && item.tenantId == null) {
+                log.warn("SaaS 模式队列数据缺少 tenantId,跳过 task={}", taskName);
+                continue;
+            }
+            grouped.computeIfAbsent(item.tenantId, k -> new ArrayList<>()).add(item.data);
+        }
+        for (Map.Entry<Long, List<T>> entry : grouped.entrySet()) {
+            Long tenantId = entry.getKey();
+            List<T> data = entry.getValue();
+            try {
+                if (saasTaskEnabled && tenantId != null) {
+                    tenantTaskRunner.runForTenantById(tenantId, taskName, () -> {
+                        batchInserter.accept(data);
+                        return null;
+                    });
+                } else {
+                    batchInserter.accept(data);
+                }
+            } catch (Exception e) {
+                log.error("批量处理失败 task={}, tenantId={}, size={}", taskName, tenantId, data.size(), e);
+            }
+        }
+    }
+
     @Override
     public void selectSopUserLogsListByTime(LocalDateTime currentTime, List<String> sopidList) throws Exception {
         long startTimeMillis = System.currentTimeMillis();
-        log.info("====== 开始选择和处理 SOP 用户日志 ======");
+        Long tenantId = captureTenantId();
+        log.info("====== 开始选择和处理 SOP 用户日志 ====== tenantId={}", tenantId);
 
         // 获取缓存的配置
         CourseConfig config;
@@ -375,7 +470,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         for (Map.Entry<String, List<SopUserLogsVo>> entry : sopLogsGroupedById.entrySet()) {
             String sopId = entry.getKey();
             List<SopUserLogsVo> userLogsVos = entry.getValue();
-            processSopGroupAsync(sopId, userLogsVos, sopGroupLatch,currentTime, groupChatMap,config,miniMap,companies);
+            processSopGroupAsync(sopId, userLogsVos, sopGroupLatch, currentTime, groupChatMap, config, miniMap, companies, tenantId);
         }
 
         // 等待所有 SOP 分组处理完成
@@ -395,11 +490,18 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
             maxAttempts = 3,
             backoff = @Backoff(delay = 2000)
     )
-    public void processSopGroupAsync(String sopId, List<SopUserLogsVo> userLogsVos, CountDownLatch latch ,LocalDateTime currentTime,
-                                     Map<String, QwGroupChat> groupChatMap,CourseConfig config,Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
-                                     List<Company> companies) {
+    public void processSopGroupAsync(String sopId, List<SopUserLogsVo> userLogsVos, CountDownLatch latch, LocalDateTime currentTime,
+                                     Map<String, QwGroupChat> groupChatMap, CourseConfig config,
+                                     Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
+                                     List<Company> companies, Long tenantId) {
         try {
-            processSopGroup(sopId, userLogsVos,currentTime, groupChatMap, config,miniMap,companies);
+            runInTenantContext(tenantId, "processSopGroup-" + sopId, () -> {
+                try {
+                    processSopGroup(sopId, userLogsVos, currentTime, groupChatMap, config, miniMap, companies, tenantId);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
         } catch (Exception e) {
             log.error("处理 SOP ID {} 时发生异常: {}", sopId, e.getMessage(), e);
         } finally {
@@ -408,9 +510,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     }
 
 
-    private void processSopGroup(String sopId, List<SopUserLogsVo> userLogsVos,LocalDateTime currentTime, Map<String,
-                                         QwGroupChat> groupChatMap,CourseConfig config,Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
-                                 List<Company> companies) throws Exception {
+    private void processSopGroup(String sopId, List<SopUserLogsVo> userLogsVos, LocalDateTime currentTime, Map<String,
+                                         QwGroupChat> groupChatMap, CourseConfig config, Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
+                                 List<Company> companies, Long tenantId) throws Exception {
         QwSopRuleTimeVO ruleTimeVO = sopMapper.selectQwSopByClickHouseId(sopId);
 
         if (ruleTimeVO == null) {
@@ -452,8 +554,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
 
         CountDownLatch userLogsLatch = new CountDownLatch(userLogsVos.size());
         for (SopUserLogsVo logVo : userLogsVos) {
-            processUserLogAsync(logVo, ruleTimeVO, rulesList, userLogsLatch, currentTime, groupChatMap,qwCompany.getMiniAppId(),
-                    config,miniMap,companies);
+            processUserLogAsync(logVo, ruleTimeVO, rulesList, userLogsLatch, currentTime, groupChatMap, qwCompany.getMiniAppId(),
+                    config, miniMap, companies, tenantId);
         }
 
         // 等待所有用户日志处理完成
@@ -474,10 +576,11 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     )
     public void processUserLogAsync(SopUserLogsVo logVo, QwSopRuleTimeVO ruleTimeVO, List<QwSopTempRules> tempSettings,
                                     CountDownLatch latch, LocalDateTime currentTime, Map<String, QwGroupChat> groupChatMap,
-                                    String miniAppId,CourseConfig config,Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
-                                    List<Company> companies) {
+                                    String miniAppId, CourseConfig config, Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
+                                    List<Company> companies, Long tenantId) {
         try {
-            processUserLog(logVo, ruleTimeVO, tempSettings,currentTime, groupChatMap, miniAppId, config,miniMap,companies);
+            runInTenantContext(tenantId, "processUserLog-" + logVo.getId(), () ->
+                    processUserLog(logVo, ruleTimeVO, tempSettings, currentTime, groupChatMap, miniAppId, config, miniMap, companies));
         } catch (Exception e) {
             log.error("处理用户日志 {} 时发生异常: {}", logVo.getId(), e.getMessage(), e);
         } finally {
@@ -698,7 +801,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     }
 
     //消息处理
-    private void insertSopUserLogs(List<SopUserLogsInfo> sopUserLogsInfos, SopUserLogsVo logVo, Date sendTime,
+    private void
+    insertSopUserLogs(List<SopUserLogsInfo> sopUserLogsInfos, SopUserLogsVo logVo, Date sendTime,
                                    QwSopRuleTimeVO ruleTimeVO, QwSopTempSetting.Content content,
                                    String qwUserId,String companyUserId,String companyId,String welcomeText,String qwUserName,
                                    Map<String, QwGroupChat> groupChatMap,String miniAppId,CourseConfig config,
@@ -1750,11 +1854,11 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     }
 
     private void enqueueZmLiveWatchLog(LiveWatchLog liveWatchLog) {
+        Long tenantId = resolveEnqueueTenantId();
         try {
-            boolean offered = zmLiveWatchQueue.offer(liveWatchLog, 5, TimeUnit.SECONDS);
+            boolean offered = zmLiveWatchQueue.offer(new TenantQueueItem<>(tenantId, liveWatchLog), 5, TimeUnit.SECONDS);
             if (!offered) {
-                log.error("LiveWatchLog 队列已满,无法添加日志: {}", JSON.toJSONString(liveWatchLog));
-                // 处理队列已满的情况,例如记录到失败队列或持久化存储
+                log.error("LiveWatchLog 队列已满 tenantId={}", tenantId);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -1791,11 +1895,11 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 将 QwSopLogs 放入队列
      */
     private void enqueueQwSopLogs(QwSopLogs sopLogs) {
+        Long tenantId = resolveEnqueueTenantId();
         try {
-            boolean offered = qwSopLogsQueue.offer(sopLogs, 5, TimeUnit.SECONDS);
+            boolean offered = qwSopLogsQueue.offer(new TenantQueueItem<>(tenantId, sopLogs), 5, TimeUnit.SECONDS);
             if (!offered) {
-                log.error("QwSopLogs 队列已满,无法添加日志: {}", JSON.toJSONString(sopLogs));
-                // 处理队列已满的情况,例如记录到失败队列或持久化存储
+                log.error("QwSopLogs 队列已满 tenantId={}, sopId={}", tenantId, sopLogs.getSopId());
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -1803,15 +1907,12 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
-    /**
-     * 将 FsCourseWatchLog 放入队列
-     */
     private void enqueueWatchLog(FsCourseWatchLog watchLog) {
+        Long tenantId = resolveEnqueueTenantId();
         try {
-            boolean offered = watchLogsQueue.offer(watchLog, 5, TimeUnit.SECONDS);
+            boolean offered = watchLogsQueue.offer(new TenantQueueItem<>(tenantId, watchLog), 5, TimeUnit.SECONDS);
             if (!offered) {
-                log.error("FsCourseWatchLog 队列已满,无法添加日志: {}", JSON.toJSONString(watchLog));
-                // 处理队列已满的情况,例如记录到失败队列或持久化存储
+                log.error("FsCourseWatchLog 队列已满 tenantId={}, log={}", tenantId, JSON.toJSONString(watchLog));
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -1819,15 +1920,12 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
-    /**
-     * 将 FsCourseWatchLog 放入队列
-     */
     private void enqueueCourseLink(FsCourseLink courseLink) {
+        Long tenantId = resolveEnqueueTenantId();
         try {
-            boolean offered = linkQueue.offer(courseLink, 5, TimeUnit.SECONDS);
+            boolean offered = linkQueue.offer(new TenantQueueItem<>(tenantId, courseLink), 5, TimeUnit.SECONDS);
             if (!offered) {
-                log.error("FsCourseLink 队列已满,无法添加日志: {}", JSON.toJSONString(courseLink));
-                // 处理队列已满的情况,例如记录到失败队列或持久化存储
+                log.error("FsCourseLink 队列已满 tenantId={}", tenantId);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -1835,19 +1933,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
-    /**
-     * 将 FsCourseSopAppLing 放入队列
-     */
     private void enqueueCourseSopAppLink(FsCourseSopAppLink sopAppLink) {
+        Long tenantId = resolveEnqueueTenantId();
         try {
-            boolean offered = sopAppLinks.offer(sopAppLink, 5, TimeUnit.SECONDS);
+            boolean offered = sopAppLinks.offer(new TenantQueueItem<>(tenantId, sopAppLink), 5, TimeUnit.SECONDS);
             if (!offered) {
-                log.error("FsCourseSopAppLink 队列已满,无法添加日志: {}", JSON.toJSONString(sopAppLink));
-                // 处理队列已满的情况,例如记录到失败队列或持久化存储
+                log.error("FsCourseSopAppLink 队列已满 tenantId={}", tenantId);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            log.error("插入 FsCourseLink 队列时被中断: {}", e.getMessage(), e);
+            log.error("插入 FsCourseSopAppLink 队列时被中断: {}", e.getMessage(), e);
         }
     }
 
@@ -1855,16 +1950,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 消费 QwSopLogs 队列并进行批量插入
      */
     private void consumeQwSopLogs() {
-        List<QwSopLogs> batch = new ArrayList<>(BATCH_SIZE);
+        List<TenantQueueItem<QwSopLogs>> batch = new ArrayList<>(BATCH_SIZE);
         while (running || !qwSopLogsQueue.isEmpty()) {
             try {
-                QwSopLogs log = qwSopLogsQueue.poll(1, TimeUnit.SECONDS);
-                if (log != null) {
-                    batch.add(log);
+                TenantQueueItem<QwSopLogs> item = qwSopLogsQueue.poll(1, TimeUnit.SECONDS);
+                if (item != null) {
+                    batch.add(item);
                 }
-                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && log == null)) {
+                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && item == null)) {
                     if (!batch.isEmpty()) {
-                        batchInsertQwSopLogs(new ArrayList<>(batch));
+                        flushBatchByTenant(new ArrayList<>(batch), "batchInsertQwSopLogs", this::batchInsertQwSopLogs);
                         batch.clear();
                     }
                 }
@@ -1873,10 +1968,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
                 log.error("QwSopLogs 消费线程被中断: {}", e.getMessage(), e);
             }
         }
-
-        // 处理剩余的数据
         if (!batch.isEmpty()) {
-            batchInsertQwSopLogs(batch);
+            flushBatchByTenant(batch, "batchInsertQwSopLogs", this::batchInsertQwSopLogs);
         }
     }
 
@@ -1884,16 +1977,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 消费 FsCourseWatchLog 队列并进行批量插入
      */
     private void consumeCourseLink() {
-        List<FsCourseLink> batch = new ArrayList<>(BATCH_SIZE);
+        List<TenantQueueItem<FsCourseLink>> batch = new ArrayList<>(BATCH_SIZE);
         while (running || !linkQueue.isEmpty()) {
             try {
-                FsCourseLink courseLink = linkQueue.poll(1, TimeUnit.SECONDS);
-                if (courseLink != null) {
-                    batch.add(courseLink);
+                TenantQueueItem<FsCourseLink> item = linkQueue.poll(1, TimeUnit.SECONDS);
+                if (item != null) {
+                    batch.add(item);
                 }
-                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && courseLink == null)) {
+                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && item == null)) {
                     if (!batch.isEmpty()) {
-                        batchInsertFsCourseLink(new ArrayList<>(batch));
+                        flushBatchByTenant(new ArrayList<>(batch), "batchInsertFsCourseLink", this::batchInsertFsCourseLink);
                         batch.clear();
                     }
                 }
@@ -1902,10 +1995,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
                 log.error("FsCourseLink 消费线程被中断: {}", e.getMessage(), e);
             }
         }
-
-        // 处理剩余的数据
         if (!batch.isEmpty()) {
-            batchInsertFsCourseLink(batch);
+            flushBatchByTenant(batch, "batchInsertFsCourseLink", this::batchInsertFsCourseLink);
         }
     }
 
@@ -1913,16 +2004,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 消费 FsCourseSopAppLink 队列并进行批量插入
      */
     private void consumeCourseSopAppLink() {
-        List<FsCourseSopAppLink> batch = new ArrayList<>(BATCH_SIZE);
+        List<TenantQueueItem<FsCourseSopAppLink>> batch = new ArrayList<>(BATCH_SIZE);
         while (running || !sopAppLinks.isEmpty()) {
             try {
-                FsCourseSopAppLink courseSopAppLink = sopAppLinks.poll(1, TimeUnit.SECONDS);
-                if (courseSopAppLink != null) {
-                    batch.add(courseSopAppLink);
+                TenantQueueItem<FsCourseSopAppLink> item = sopAppLinks.poll(1, TimeUnit.SECONDS);
+                if (item != null) {
+                    batch.add(item);
                 }
-                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && courseSopAppLink == null)) {
+                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && item == null)) {
                     if (!batch.isEmpty()) {
-                        batchInsertFsCourseSopAppLink(new ArrayList<>(batch));
+                        flushBatchByTenant(new ArrayList<>(batch), "batchInsertFsCourseSopAppLink", this::batchInsertFsCourseSopAppLink);
                         batch.clear();
                     }
                 }
@@ -1931,10 +2022,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
                 log.error("FsCourseSopAppLink 消费线程被中断: {}", e.getMessage(), e);
             }
         }
-
-        // 处理剩余的数据
         if (!batch.isEmpty()) {
-            batchInsertFsCourseSopAppLink(batch);
+            flushBatchByTenant(batch, "batchInsertFsCourseSopAppLink", this::batchInsertFsCourseSopAppLink);
         }
     }
 
@@ -1942,16 +2031,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 消费 FsCourseSopAppLink 队列并进行批量插入
      */
     private void consumeZmLiveWatchQueue() {
-        List<LiveWatchLog> batch = new ArrayList<>(BATCH_SIZE);
+        List<TenantQueueItem<LiveWatchLog>> batch = new ArrayList<>(BATCH_SIZE);
         while (running || !zmLiveWatchQueue.isEmpty()) {
             try {
-                LiveWatchLog livewatchLog = zmLiveWatchQueue.poll(1, TimeUnit.SECONDS);
-                if (livewatchLog != null) {
-                    batch.add(livewatchLog);
+                TenantQueueItem<LiveWatchLog> item = zmLiveWatchQueue.poll(1, TimeUnit.SECONDS);
+                if (item != null) {
+                    batch.add(item);
                 }
-                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && livewatchLog == null)) {
+                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && item == null)) {
                     if (!batch.isEmpty()) {
-                        batchInsertLiveWatchLog(new ArrayList<>(batch));
+                        flushBatchByTenant(new ArrayList<>(batch), "batchInsertLiveWatchLog", this::batchInsertLiveWatchLog);
                         batch.clear();
                     }
                 }
@@ -1960,10 +2049,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
                 log.error("zmLiveWatchQueue 消费线程被中断: {}", e.getMessage(), e);
             }
         }
-
-        // 处理剩余的数据
         if (!batch.isEmpty()) {
-            batchInsertLiveWatchLog(batch);
+            flushBatchByTenant(batch, "batchInsertLiveWatchLog", this::batchInsertLiveWatchLog);
         }
     }
 
@@ -1971,16 +2058,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
      * 消费 FsCourseWatchLog 队列并进行批量插入
      */
     private void consumeWatchLogs() {
-        List<FsCourseWatchLog> batch = new ArrayList<>(BATCH_SIZE);
+        List<TenantQueueItem<FsCourseWatchLog>> batch = new ArrayList<>(BATCH_SIZE);
         while (running || !watchLogsQueue.isEmpty()) {
             try {
-                FsCourseWatchLog watchLog = watchLogsQueue.poll(1, TimeUnit.SECONDS);
-                if (watchLog != null) {
-                    batch.add(watchLog);
+                TenantQueueItem<FsCourseWatchLog> item = watchLogsQueue.poll(1, TimeUnit.SECONDS);
+                if (item != null) {
+                    batch.add(item);
                 }
-                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && watchLog == null)) {
+                if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && item == null)) {
                     if (!batch.isEmpty()) {
-                        batchInsertFsCourseWatchLogs(new ArrayList<>(batch));
+                        flushBatchByTenant(new ArrayList<>(batch), "batchInsertFsCourseWatchLog", this::batchInsertFsCourseWatchLogs);
                         batch.clear();
                     }
                 }
@@ -1989,10 +2076,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
                 log.error("FsCourseWatchLog 消费线程被中断: {}", e.getMessage(), e);
             }
         }
-
-        // 处理剩余的数据
         if (!batch.isEmpty()) {
-            batchInsertFsCourseWatchLogs(batch);
+            flushBatchByTenant(batch, "batchInsertFsCourseWatchLog", this::batchInsertFsCourseWatchLogs);
         }
     }