zyp 2 months ago
parent
commit
ef1aa71fe5

+ 99 - 0
docs/定时任务模块-使用XXL-JOB改造方案.md

@@ -0,0 +1,99 @@
+# 定时任务模块:使用 XXL-JOB 改造方案(仅方案)
+
+在现有「Quartz + @Scheduled + 按租户顺序执行」的 SaaS 定时任务基础上,若调度层改为 **XXL-JOB**,可参考本方案。只给出改造思路与要点,不涉及具体代码实现。
+
+---
+
+## 一、XXL-JOB 与当前架构的对应关系
+
+| 当前 | 使用 XXL-JOB 后 |
+|------|------------------|
+| Quartz 调度器(内嵌在 fs-admin) | XXL-JOB Admin 中心:负责 cron 配置、触发、路由到执行器 |
+| 分发器 Job(TenantJobDispatcherJob)每分钟执行 | XXL-JOB 中注册一个 Job,cron 为每分钟,执行器执行「租户分发 + 执行各租户 sys_job」 |
+| fs-qw-task 的 @Scheduled 方法 | 每个原定时任务在 XXL-JOB 中对应一个 Job(或一个 Job + 不同 jobParam 区分),执行器内仍用 TenantTaskRunner.runForEachTenant(业务逻辑) |
+
+**原则**:调度由 XXL-JOB 负责;**按租户切库、按租户执行业务**的逻辑保留在执行器应用内,不交给 XXL-JOB 做租户维度的调度。
+
+---
+
+## 二、fs-quartz 部分(管理端可配置任务)
+
+### 2.1 角色划分
+
+- **XXL-JOB Admin**:只配置**一条**任务,例如  
+  - JobHandler:`tenantJobDispatcher`  
+  - Cron:`0 * * * * ?`(每分钟)  
+  - 路由策略、阻塞策略、失败重试等按需配置。
+- **执行器**:部署在 **fs-admin** 所在应用(或与 fs-admin 同进程)。  
+  - 引入 `xxl-job-core` / `xxl-job-spring-boot-starter`,配置 `xxl.job.admin.addresses`、`xxl.job.executor.appname`、`xxl.job.executor.port` 等。
+  - 注册一个 Bean 实现 `XxlJob` 的 `tenantJobDispatcher`,其内部逻辑与当前 **TenantJobDispatcherJob** 一致:  
+    1. 主库查启用且未过期租户;  
+    2. for 每个租户:切库 → 加载 projectConfig → 查该租户库 sys_job 中本分钟到点任务 → 执行 `JobInvokeUtil.invokeMethod(job)`;  
+    3. finally 清理上下文。
+
+### 2.2 与 Quartz 的衔接
+
+- **去掉**:fs-admin 中 Quartz 的 `SchedulerFactoryBean`、`SysJobServiceImpl.init()` 里对 Quartz 的注册(或通过配置关闭 Quartz 调度,仅保留执行器逻辑)。
+- **保留**:  
+  - sys_job / sys_job_log 仍在各租户库;  
+  - `JobInvokeUtil`、CronUtils、ScheduleUtils 等仍由执行器在「租户分发」逻辑中调用;  
+  - 管理端「定时任务」界面仍对当前登录租户库的 sys_job 做 CRUD,不依赖 Quartz 内存态,仅需说明「实际触发由 XXL-JOB 统一每分钟执行分发器」即可。
+
+### 2.3 小结
+
+- **谁触发**:XXL-JOB Admin 按 cron 触发「租户分发器」这一条任务。  
+- **谁执行**:执行器(fs-admin 进程)执行「查主库租户列表 → 按租户切库 → 执行各租户到点 sys_job」。  
+- **顺序/并行**:与现有一致,建议仍为**按租户顺序执行**,避免 ThreadLocal 与连接池在多线程下复杂化;若后续需要并行,再在执行器内用线程池对「每个租户」提交独立任务并在子线程内设租户上下文。
+
+---
+
+## 三、fs-qw-task 部分(企微等固定节奏任务)
+
+### 3.1 角色划分
+
+- **XXL-JOB Admin**:为每个原 @Scheduled 任务在 Admin 中配置一条 Job。  
+  - 例如:`qwCheckSopRuleTime`(cron:每天 1:10)、`addTag`(每 20 分钟)、`sendQwGroupMsgTask`(每 10 分钟)等,与现有 cron 保持一致。  
+  - JobHandler 命名可与方法名一致,便于维护。
+- **执行器**:部署在 **fs-qw-task** 应用。  
+  - 引入 XXL-JOB 执行器依赖,配置 appname、admin 地址、端口等。  
+  - 每个 XXL-JOB Job 对应一个 XxlJob 实现类(或一个统一 Dispatcher 根据 jobParam 路由到不同业务方法),在方法内部调用 **TenantTaskRunner.runForEachTenant(原有业务逻辑)**,保证仍是「按租户顺序执行、每租户切库+加载 projectConfig」。
+
+### 3.2 与 @Scheduled 的衔接
+
+- **去掉**:fs-qw-task 中所有业务类上的 `@Scheduled`,以及 `@EnableScheduling`(若不再需要)。  
+- **保留**:  
+  - TenantTaskRunner、TenantDataSourceManager、按租户切库与 ProjectConfig 加载逻辑;  
+  - 原有业务方法(如 `qwSopService.checkSopRuleTime()`)不变,仅由「XXL-JOB 的 JobHandler」调用 `tenantTaskRunner.runForEachTenant(() -> qwSopService.checkSopRuleTime())` 这类形式。  
+- **SaaS 开关**:可保留 `saas.task.enabled`。当为 true 时,XXL-JOB 触发的 Job 内部用 `runForEachTenant`;为 false 时可直接执行业务方法(单租户),便于兼容。
+
+### 3.3 小结
+
+- **谁触发**:XXL-JOB Admin 按各任务配置的 cron 触发对应 Job。  
+- **谁执行**:fs-qw-task 作为执行器,收到调度后执行「TenantTaskRunner.runForEachTenant(具体业务)」。  
+- **顺序/并行**:与现有一致,默认**按租户顺序**;若需并行,再在 TenantTaskRunner 或执行器层增加「按租户并行」的可选实现。
+
+---
+
+## 四、统一说明(顺序 / 并行)
+
+- **当前方案(Quartz + @Scheduled)**:按租户**顺序**执行,无多线程并行。  
+- **改为 XXL-JOB 后**:仅把「谁在何时触发」从 Quartz/Spring 换成 XXL-JOB;**执行器内部**仍建议先保持「按租户顺序」执行,与现有一致。  
+- 若日后需要「多租户并行」:  
+  - 可在执行器内用线程池,每个租户一个任务,在子线程中先设置该租户的 DynamicDataSourceContextHolder + TenantConfigContext,再执行业务,finally 清理。  
+  - 需注意连接池大小、超时与失败重试,以及 XXL-JOB 的「阻塞策略」「路由策略」是否与多线程兼容(例如选「单机串行」避免同一 Job 并发)。
+
+---
+
+## 五、实施顺序建议
+
+1. **执行器接入**:在 fs-admin、fs-qw-task 中分别引入 XXL-JOB 执行器依赖并配置,先注册一个简单测试 Job,确认 Admin 能正常调度。  
+2. **fs-quartz 替换**:关闭 Quartz 的调度注册,保留「租户分发器」逻辑,改为由 XXL-JOB 的 `tenantJobDispatcher` 触发;管理端界面与 sys_job 表不变。  
+3. **fs-qw-task 替换**:逐个去掉 @Scheduled,在 XXL-JOB Admin 中配置对应 Job 与 cron,执行器内用 TenantTaskRunner.runForEachTenant 包装原业务。  
+4. **回归与监控**:按租户验证任务执行、日志与数据;观察 XXL-JOB 调度日志与执行器负载。
+
+---
+
+## 六、与现有文档的关系
+
+- 租户数据源、TenantTaskRunner、按租户切库与 projectConfig 加载等,与 [定时任务模块SaaS化方案.md](./定时任务模块SaaS化方案.md) 一致,无需因改用 XXL-JOB 而改变。  
+- 本方案仅描述「**调度层由 Quartz/@Scheduled 换为 XXL-JOB**」的改造要点,不改变「按租户执行」的语义与实现方式。

+ 167 - 1
docs/定时任务模块SaaS化方案.md

@@ -102,7 +102,56 @@
 4. **Redis**
    - 若任务里有用到 Redis,需保证 RedisTemplate 使用 **TenantKeyRedisSerializer**,且执行时 ThreadLocal 或上下文里能拿到当前 tenantId(在 runForEachTenant 内 set 一次即可)。
 
-**小结**:fs-qw-task 不改任务业务逻辑,只加一层「按租户执行」的壳;数据源与租户配置与 fs-admin 对齐即可。
+### 3.4 按租户并行执行(可选,提升多租户效率)
+
+当租户数量多、定时任务多时,若仍按租户**顺序**执行,总耗时会随租户数线性增加,可能影响执行效率。
+
+**建议与实现**:
+
+- **配置项**(`application.yml`):
+  - `saas.task.parallel`:为 `true` 时,`TenantTaskRunner` 使用线程池**按租户并行**执行同一任务(每个租户一个子任务,子任务内仍先切库、加载配置再执行业务)。
+  - `saas.task.parallel.threads`:并行时线程池大小(默认 4),建议不大于租户数,并考虑数据源连接池与机器负载。
+- **实现要点**:
+  - 在 `TenantTaskRunner` 中根据 `saas.task.parallel` 选择顺序执行或并行执行;并行时用固定大小线程池提交「单租户」任务,子任务内设置该租户数据源与 `TenantConfigContext`,执行完毕清理上下文;整体用 `CountDownLatch` 或 `invokeAll` 等待所有租户完成,超时时间建议 30 分钟。
+  - 单租户执行异常只记录日志,不中断其他租户。
+- **注意**:开启并行时需保证数据源连接池最大连接数 ≥ 并行线程数,避免阻塞;若任务内有全局单例缓存(如某 Service 的 `cachedCourseConfig`),需改为按租户缓存(如 `Map<Long, Config>`),否则多租户下会错乱。
+
+**小结**:默认顺序执行,与原有行为一致;数据量大、租户多时可开启 `saas.task.parallel` 提升整体执行效率。
+
+### 3.5 fs-qw-task 定时任务清单(SaaS 下均已按租户执行)
+
+| 所在类 | 方法名 | Cron/频率 | 说明 |
+|--------|--------|-----------|------|
+| **qwTask** | qwCheckSopRuleTime | 每天 1:10 | 检查 SOP 规则时间 |
+| **qwTask** | addTag | 每 20 分钟 | 添加标签 |
+| **qwTask** | selectSopUserLogsListByTime | 每小时第 5 分钟 | 按营期生成 sopLogs 待发记录 |
+| **qwTask** | wxSop | 每小时第 5 分钟 | 微信 SOP 处理 |
+| **qwTask** | SendQwApiSopLogTimer | 每天 1:20 | 企微官方接口群发(单链) |
+| **qwTask** | SendQwApiSopLogTimerNew | 每天 0:10、1:10 | 企微官方接口群发(新版) |
+| **qwTask** | GetQwApiSopLogResultTimerNew | 每天 8:00 | 获取企微群发反馈结果 |
+| **qwTask** | sendQwGroupMsgTask | 每 10 分钟 | 群发消息任务 |
+| **qwTask** | sendQwBySop | 每天 8:00 | 按 SOP 发送转换消息 |
+| **qwTask** | qwExternalErrRetryTimer | 每 3 分钟 | 企微打标签/备注补偿 |
+| **qwTask** | updateQwSopLogsByCancel | 每小时整点 | 补发过期完课消息 |
+| **qwTask** | batchProcessingExpiredMessages | 每 8 分钟 | 批量处理过期 SOP 待发记录 |
+| **qwTask** | deleteQwSopLogsByDate | 每天 0:10 | 清除 2 天前 SOP 记录 |
+| **qwTask** | processRepairQwSopLogsTimer | 每 3 小时 30 分 | 修复营期异常数据 |
+| **qwTask** | processSopUserLogsInfoByIsDaysNotStudy | 每天 2:35 | E 级客户看课恢复 |
+| **qwTask** | processQwSopExternalContactRatingTimer | 每天 3:45 | 客户分级评级 |
+| **qwTask** | processQwSopExternalContactRatingMoreSevenDaysTimer | 每天 3:30 | 超 7 天未看课 E 级标记 |
+| **qwTask** | updateQwSopLogsDayBefore | 每天 0:03 | 更新前一日待发送 |
+| **qwTask** | updateQwExternalContactUnionid | 每 2 天 0:01 | 同步外部联系人 UnionId |
+| **qwTask** | autoPullGroup | 每天 16:00 | 定时拉人进群 |
+| **UserCourseWatchCountTask** | userCourseCountTask | 每 20 分钟 | 会员看课统计 |
+| **CourseWatchLogScheduler** | checkWatchStatus | 每 1 分钟 | 检查看课状态(含整 5 分钟创建完课消息) |
+| **CourseWatchLogScheduler** | createCourseFinishMsg | 每 5 分钟 | 创建完课消息 |
+| **CourseWatchLogScheduler** | delCourseExpireLink | 每天 0:00 | 删除过期短链 |
+| **CourseWatchLogScheduler** | checkFsUserWatchStatus | 每 30 秒 | WXH5 检查会员看课状态 |
+| **QwExternalContactRatingServiceImpl** | refreshRatingConfig | 每 6 小时 50 分 | 刷新评级配置 qwRating:config |
+| **QwExternalContactRatingMoreSevenDaysServiceImpl** | refreshRatingConfig | 每 6 小时 50 分 | 刷新超 7 天评级配置 |
+| **SopLogsTaskServiceImpl** | refreshCourseConfig | 每 60 秒 | 刷新课程配置 course.config |
+
+上述任务在 `saas.task.enabled=true` 时均通过 `TenantTaskRunner.runForEachTenant`(或并行模式)按租户执行。其中 **SopLogsTaskServiceImpl.refreshCourseConfig** 使用的实例级缓存 `cachedCourseConfig` 若在多个租户任务中共用,建议改为按租户缓存(如 `Map<租户ID, CourseConfig>`),否则仅最后执行到的租户配置生效。
 
 ---
 
@@ -128,3 +177,120 @@
 
 - 租户数据源、租户配置上下文、主库 tenant_info 等能力均依赖 [SaaS改造方案.md](./SaaS改造方案.md) 中的 **fs-framework** 与 **fs-service** 已有实现。
 - 定时任务模块仅在此基础上增加「无 HTTP 请求场景下的租户上下文设置」与「按租户循环执行」的封装,不改变整体多租户架构。
+
+
+
+着手处理:1) 提出并行执行建议并实现可配置的按租户并行;2) 查找 fs-qw-task 中所有 @Scheduled 并补全租户化执行。
+
+
+[3 tools called]
+
+
+正在读取相关任务类与配置,以便实现并行方案并补全未做租户化的定时任务。
+
+
+[4 tools called]
+
+
+查看带 @Scheduled 的 Service 实现类,确定如何接入租户执行:
+
+
+[4 tools called]
+
+
+正在实现:先为 TenantTaskRunner 增加可配置的按租户并行执行,并补全所有未做租户化的定时任务。
+
+
+[10 tools called]
+
+
+正在为三处 Service 的 refresh 定时任务添加租户化支持:
+
+
+[11 tools called]
+
+
+移除 QwExternalContactRatingServiceImpl 中可能重复的 `javax.annotation` 导入,并为 SopLogsTaskServiceImpl 添加租户支持。
+
+
+[7 tools called]
+
+
+检查 SopLogsTaskServiceImpl 中 cachedCourseConfig 的用法,确认是否需按租户缓存:
+
+
+[3 tools called]
+
+
+SopLogsTaskServiceImpl 的 refresh 已按租户调用;cachedCourseConfig 为单例缓存,多租户下需改为按租户缓存,已记录在文档。正在更新 application.yml 和文档。
+
+
+[9 tools called]
+
+
+单独编译 fs-qw-task 时未解析到其他模块的依赖。正在从根目录编译整个项目:
+
+
+[2 tools called]
+
+
+已按你的要求做完「多租户下执行效率」的改进,并补全了 fs-qw-task 里所有定时任务的租户化,修改总结如下。
+
+---
+
+## 一、多租户下执行效率:建议与实现
+
+**问题**:租户多、任务多时,若仍按租户顺序执行,总耗时会随租户数线性增加。
+
+**做法**:在 **TenantTaskRunner** 中增加**可配置的按租户并行**,由配置决定是顺序还是并行。
+
+- **配置**(`fs-qw-task/application.yml`):
+  - `saas.task.parallel`:为 `true` 时按租户并行执行;默认 `false`,保持原顺序行为。
+  - `saas.task.parallel.threads`:并行时线程池大小(默认 4),可按租户数和连接池适当调大。
+- **实现要点**:
+  - 顺序:沿用原来的 for 循环逐租户执行。
+  - 并行:用固定大小线程池,每个租户一个任务,子任务内先切库、加载 projectConfig,再执行业务,最后清理上下文;整体用 `CountDownLatch` 等待(超时 30 分钟),单租户异常只打日志不中断其他租户。
+- **注意**:开并行时需保证**数据源连接池最大连接数 ≥ 并行线程数**,避免阻塞。
+
+---
+
+## 二、定时任务模块改动摘要
+
+### 1. TenantTaskRunner(`fs-qw-task`)
+
+- 新增 `saas.task.parallel`、`saas.task.parallel.threads` 的读取与线程池初始化/销毁。
+- `runForEachTenant` 内部根据配置在「顺序」与「并行」两种实现间切换;对外接口不变,现有调用无需改。
+
+### 2. 补全的定时任务租户化(SaaS 开启时按租户执行)
+
+| 类 | 方法 | 说明 |
+|----|------|------|
+| **UserCourseWatchCountTask** | userCourseCountTask | 会员看课统计(每 20 分钟) |
+| **CourseWatchLogScheduler** | checkWatchStatus | 检查看课状态(每 1 分钟,含整 5 分钟创建完课消息) |
+| **CourseWatchLogScheduler** | createCourseFinishMsg | 创建完课消息(每 5 分钟) |
+| **CourseWatchLogScheduler** | delCourseExpireLink | 删除过期短链(每天 0 点) |
+| **CourseWatchLogScheduler** | checkFsUserWatchStatus | WXH5 检查会员看课(每 30 秒) |
+| **QwExternalContactRatingServiceImpl** | refreshRatingConfig | 刷新评级配置 qwRating:config(每 6 小时 50 分) |
+| **QwExternalContactRatingMoreSevenDaysServiceImpl** | refreshRatingConfig | 刷新超 7 天评级配置(每 6 小时 50 分) |
+| **SopLogsTaskServiceImpl** | refreshCourseConfig | 刷新课程配置 course.config(每 60 秒) |
+
+**qwTask** 里原有 20 个定时任务本身已按租户执行,本次未改逻辑,仅在文档中做了统一清单。
+
+### 3. 配置与文档
+
+- **application.yml**:补充了 `saas.task.parallel`、`saas.task.parallel.threads` 的注释说明。
+- **docs/定时任务模块SaaS化方案.md**:
+  - 新增 **3.4 按租户并行执行(可选)**:说明配置、实现要点和注意事项。
+  - 新增 **3.5 fs-qw-task 定时任务清单**:列出全部 27 个定时任务及所在类、方法、Cron/频率、说明;并注明 **SopLogsTaskServiceImpl.refreshCourseConfig** 使用的 `cachedCourseConfig` 若在多租户下共用,建议改为按租户缓存(如 `Map<租户ID, CourseConfig>`)。
+
+---
+
+## 三、编译
+
+在项目根目录执行:`mvn compile -pl fs-qw-task -am`,**编译已通过**。
+
+---
+
+**使用建议**:  
+- 租户少、任务轻:保持 `saas.task.parallel=false` 即可。  
+- 租户多、希望缩短总耗时:开启 `saas.task.parallel=true`,并适当调大 `saas.task.parallel.threads` 与数据源连接池,同时注意 SopLogsTaskServiceImpl 的 `cachedCourseConfig` 若在多租户下共用,需改为按租户缓存。

+ 77 - 45
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -11,23 +11,30 @@ import com.fs.system.service.ISysConfigService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.Calendar;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * 看课/完课/短链等定时任务。SaaS 模式下按租户执行。
+ */
 @Component
 @Slf4j
 public class CourseWatchLogScheduler {
     private final AtomicBoolean isRunning1 = new AtomicBoolean(false);
-
     private final AtomicBoolean isRunning2 = new AtomicBoolean(false);
-
     private final AtomicBoolean isRunning3 = new AtomicBoolean(false);
-
     private final AtomicBoolean isRunning4 = new AtomicBoolean(false);
 
+    @Value("${saas.task.enabled:false}")
+    private boolean saasTaskEnabled;
+    @Resource
+    private TenantTaskRunner tenantTaskRunner;
+
     @Autowired
     private FsCourseWatchLogMapper courseWatchLogMapper;
 
@@ -83,111 +90,136 @@ public class CourseWatchLogScheduler {
 //    }
 
 
+    /** 检查看课状态:每分钟执行;整5分钟时顺带创建完课消息。SaaS 开启时按租户执行。 */
     @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void checkWatchStatus() {
-        // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning1.compareAndSet(false, true)) {
             log.info("检查看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
             return;
         }
+        try {
+            if (saasTaskEnabled) {
+                tenantTaskRunner.runForEachTenant(this::doCheckWatchStatus);
+            } else {
+                doCheckWatchStatus();
+            }
+        } finally {
+            isRunning1.set(false);
+        }
+    }
 
+    private void doCheckWatchStatus() {
         try {
             log.info("检查看课中任务执行>>>>>>>>>>>>");
             courseWatchLogService.scheduleBatchUpdateToDatabase();
             courseWatchLogService.checkWatchStatus();
             log.info("检查看课中任务执行完成>>>>>>>>>>>>");
-
-            // 检查当前时间是否为整五分钟(0, 5, 10, 15, ... 55分钟)
             Calendar calendar = Calendar.getInstance();
             int minute = calendar.get(Calendar.MINUTE);
-
-            // 只有当分钟数是5的倍数时才执行创建完课消息
             if (minute % 5 == 0) {
                 try {
-                    long startTime = System.currentTimeMillis();
-                    log.info("创建完课消息 - 定时任务开始"+System.currentTimeMillis());
+                    log.info("创建完课消息 - 定时任务开始 {}", System.currentTimeMillis());
                     sopLogsTaskService.createCourseFinishMsg();
-                    long endTime = System.currentTimeMillis();
-                    long duration = endTime - startTime;
-                    log.info("创建完课消息 - 定时任务成功完成"+duration);
+                    log.info("创建完课消息 - 定时任务成功完成");
                 } catch (Exception e) {
                     log.error("创建完课消息 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
                 }
             }
         } catch (Exception e) {
             log.error("检查看课中任务执行完成 - 定时任务执行失败", ExceptionUtils.getStackTrace(e));
-        } finally {
-            // 重置标志为 false,表示任务已完成
-            isRunning1.set(false);
         }
     }
 
 
 
-    /**
-     * 创建完课消息
-     */
+    /** 创建完课消息:每5分钟执行。SaaS 开启时按租户执行。 */
     @Scheduled(fixedRate = 300000) // 每五分钟执行一次
     public void createCourseFinishMsg() {
-        // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning3.compareAndSet(false, true)) {
             log.warn("创建完课消息 - 上一个任务尚未完成,跳过此次执行");
             return;
         }
-
         try {
-            log.info("创建完课消息 - 定时任务开始");
-            sopLogsTaskService.createCourseFinishMsg();
-            log.info("创建完课消息 - 定时任务成功完成");
+            if (saasTaskEnabled) {
+                tenantTaskRunner.runForEachTenant(() -> {
+                    try {
+                        log.info("创建完课消息 - 定时任务开始");
+                        sopLogsTaskService.createCourseFinishMsg();
+                        log.info("创建完课消息 - 定时任务成功完成");
+                    } catch (Exception e) {
+                        log.error("创建完课消息 - 定时任务执行失败", e);
+                    }
+                });
+            } else {
+                log.info("创建完课消息 - 定时任务开始");
+                sopLogsTaskService.createCourseFinishMsg();
+                log.info("创建完课消息 - 定时任务成功完成");
+            }
         } catch (Exception e) {
             log.error("创建完课消息 - 定时任务执行失败", e);
         } finally {
-            // 重置标志为 false,表示任务已完成
             isRunning3.set(false);
         }
-
     }
 
     @Autowired
     private IFsCourseLinkService courseLinkService;
 
 
-    // 定时任务,每天0点执行
-
-    /**
-     * 每天删除过期短链
-     */
-    @Scheduled(cron = "0 0 0 * * ?")  // 0点0分0秒执行
+    /** 每天 0 点删除过期短链。SaaS 开启时按租户执行。 */
+    @Scheduled(cron = "0 0 0 * * ?")
     public void delCourseExpireLink() {
         try {
-            log.info("删除过期短链 - 定时任务开始");
-            courseLinkService.delCourseExpireLink();
-            log.info("删除过期短链 - 定时任务成功完成");
+            if (saasTaskEnabled) {
+                tenantTaskRunner.runForEachTenant(() -> {
+                    try {
+                        log.info("删除过期短链 - 定时任务开始");
+                        courseLinkService.delCourseExpireLink();
+                        log.info("删除过期短链 - 定时任务成功完成");
+                    } catch (Exception e) {
+                        log.error("删除过期短链 - 定时任务执行失败", e);
+                    }
+                });
+            } else {
+                log.info("删除过期短链 - 定时任务开始");
+                courseLinkService.delCourseExpireLink();
+                log.info("删除过期短链 - 定时任务成功完成");
+            }
         } catch (Exception e) {
             log.error("删除过期短链 - 定时任务执行失败", e);
         }
-
     }
 
-    @Scheduled(fixedRate = 30000) // 每分钟执行一次
+    /** WXH5 检查会员看课状态:每30秒执行。SaaS 开启时按租户执行。 */
+    @Scheduled(fixedRate = 30000)
     public void checkFsUserWatchStatus() {
-        // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning4.compareAndSet(false, true)) {
             log.warn("WXH5-检查会员看课中任务执行 - 上一个任务尚未完成,跳过此次执行");
             return;
         }
         try {
-            log.info("WXH5-检查会员看课中任务执行>>>>>>>>>>>>");
-            courseWatchLogService.scheduleUpdateDurationToDatabase();
-            courseWatchLogService.checkFsUserWatchStatus();
-            log.info("WXH5-检查会员看课中任务执行完成>>>>>>>>>>>>");
-        }catch (Exception e) {
+            if (saasTaskEnabled) {
+                tenantTaskRunner.runForEachTenant(() -> {
+                    try {
+                        log.info("WXH5-检查会员看课中任务执行>>>>>>>>>>>>");
+                        courseWatchLogService.scheduleUpdateDurationToDatabase();
+                        courseWatchLogService.checkFsUserWatchStatus();
+                        log.info("WXH5-检查会员看课中任务执行完成>>>>>>>>>>>>");
+                    } catch (Exception e) {
+                        log.error("WXH5-检查会员看课中任务执行完成 - 定时任务执行失败", e);
+                    }
+                });
+            } else {
+                log.info("WXH5-检查会员看课中任务执行>>>>>>>>>>>>");
+                courseWatchLogService.scheduleUpdateDurationToDatabase();
+                courseWatchLogService.checkFsUserWatchStatus();
+                log.info("WXH5-检查会员看课中任务执行完成>>>>>>>>>>>>");
+            }
+        } catch (Exception e) {
             log.error("WXH5-检查会员看课中任务执行完成 - 定时任务执行失败", e);
         } finally {
-            // 重置标志为 false,表示任务已完成
             isRunning4.set(false);
         }
-
     }
 
 

+ 112 - 24
fs-qw-task/src/main/java/com/fs/app/task/TenantTaskRunner.java

@@ -12,16 +12,21 @@ import com.fs.system.mapper.SysConfigMapper;
 import com.fs.tenant.domain.TenantInfo;
 import com.fs.tenant.service.TenantInfoService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.*;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
  * SaaS 模式下按租户执行定时任务:从主库查启用租户,逐租户切库并设置租户配置后执行传入的逻辑。
+ * 支持顺序执行与可配置的按租户并行执行,租户多、任务多时可通过并行提升整体执行效率。
  */
 @Slf4j
 @Component
@@ -34,10 +39,73 @@ public class TenantTaskRunner {
     @Resource
     private SysConfigMapper sysConfigMapper;
 
+    /** 为 true 时按租户并行执行(线程池);为 false 时顺序执行。默认 false 保持与原有行为一致。 */
+    @Value("${saas.task.parallel:false}")
+    private boolean parallelEnabled;
+    /** 并行时线程池大小,建议不大于租户数量且考虑数据源连接池与机器负载。 */
+    @Value("${saas.task.parallel.threads:4}")
+    private int parallelThreads;
+
+    private ExecutorService tenantExecutor;
+
+    @PostConstruct
+    public void initExecutor() {
+        if (parallelEnabled && parallelThreads > 0) {
+            tenantExecutor = new ThreadPoolExecutor(
+                    Math.min(4, parallelThreads),
+                    parallelThreads,
+                    60L, TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<>(512),
+                    r -> {
+                        Thread t = new Thread(r, "saas-tenant-task-" + System.identityHashCode(r));
+                        t.setDaemon(false);
+                        return t;
+                    },
+                    new ThreadPoolExecutor.CallerRunsPolicy()
+            );
+            log.info("[SaaS Task] 按租户并行已开启,线程池大小: {}", parallelThreads);
+        }
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (tenantExecutor != null) {
+            tenantExecutor.shutdown();
+            try {
+                if (!tenantExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    tenantExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                tenantExecutor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
     /**
      * 对每个启用且未过期的租户执行一次 action(已切到该租户库并设置 TenantConfigContext)。
+     * 根据配置为顺序执行或按租户并行执行。
      */
     public void runForEachTenant(Consumer<TenantInfo> action) {
+        List<TenantInfo> validTenants = getValidTenants();
+        if (validTenants == null || validTenants.isEmpty()) {
+            return;
+        }
+        if (parallelEnabled && tenantExecutor != null) {
+            runForEachTenantParallel(validTenants, action);
+        } else {
+            runForEachTenantSequential(validTenants, action);
+        }
+    }
+
+    /**
+     * 对每个租户执行无参逻辑(不需要 TenantInfo 时使用)。
+     */
+    public void runForEachTenant(Runnable action) {
+        runForEachTenant(t -> action.run());
+    }
+
+    private List<TenantInfo> getValidTenants() {
         try {
             DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
             TenantInfo query = new TenantInfo();
@@ -45,41 +113,61 @@ public class TenantTaskRunner {
             List<TenantInfo> tenants = tenantInfoService.selectTenantInfoList(query);
             if (tenants == null || tenants.isEmpty()) {
                 log.debug("[SaaS Task] 无启用租户,跳过");
-                return;
+                return null;
             }
             Date now = new Date();
-            List<TenantInfo> validTenants = tenants.stream()
+            return tenants.stream()
                     .filter(t -> t.getExpireTime() == null || !t.getExpireTime().before(now))
                     .collect(Collectors.toList());
+        } finally {
+            DynamicDataSourceContextHolder.clearDataSourceType();
+        }
+    }
 
-            for (TenantInfo tenant : validTenants) {
+    private void runForEachTenantSequential(List<TenantInfo> validTenants, Consumer<TenantInfo> action) {
+        for (TenantInfo tenant : validTenants) {
+            runForOneTenant(tenant, action);
+        }
+    }
+
+    private void runForEachTenantParallel(List<TenantInfo> validTenants, Consumer<TenantInfo> action) {
+        CountDownLatch latch = new CountDownLatch(validTenants.size());
+        for (TenantInfo tenant : validTenants) {
+            tenantExecutor.submit(() -> {
                 try {
-                    tenantDataSourceManager.switchTenant(tenant);
-                    SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
-                    if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
-                        TenantConfigContext.set(JSONObject.parseObject(cfg.getConfigValue()));
-                    } else {
-                        TenantConfigContext.set(null);
-                    }
-                    ProjectConfig.loadTenantConfigsFromContext();
-                    action.accept(tenant);
-                } catch (Exception e) {
-                    log.error("[SaaS Task] 租户 tenantId={}, tenantCode={} 执行异常", tenant.getId(), tenant.getTenantCode(), e);
+                    runForOneTenant(tenant, action);
                 } finally {
-                    ProjectConfig.clearTenantConfigs();
-                    TenantConfigContext.clear();
-                    DynamicDataSourceContextHolder.clearDataSourceType();
+                    latch.countDown();
                 }
+            });
+        }
+        try {
+            if (!latch.await(30, TimeUnit.MINUTES)) {
+                log.warn("[SaaS Task] 按租户并行执行超时(30分钟)");
             }
-        } finally {
-            DynamicDataSourceContextHolder.clearDataSourceType();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("[SaaS Task] 按租户并行执行被中断", e);
         }
     }
 
-    /**
-     * 对每个租户执行无参逻辑(不需要 TenantInfo 时使用)。
-     */
-    public void runForEachTenant(Runnable action) {
-        runForEachTenant(t -> action.run());
+    private void runForOneTenant(TenantInfo tenant, Consumer<TenantInfo> action) {
+        try {
+            tenantDataSourceManager.switchTenant(tenant);
+            SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
+            if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
+                TenantConfigContext.set(JSONObject.parseObject(cfg.getConfigValue()));
+            } else {
+                TenantConfigContext.set(null);
+            }
+            ProjectConfig.loadTenantConfigsFromContext();
+            action.accept(tenant);
+        } catch (Exception e) {
+            log.error("[SaaS Task] 租户 tenantId={}, tenantCode={} 执行异常", tenant.getId(), tenant.getTenantCode(), e);
+        } finally {
+            ProjectConfig.clearTenantConfigs();
+            TenantConfigContext.clear();
+            DynamicDataSourceContextHolder.clearDataSourceType();
+        }
     }
 }

+ 19 - 13
fs-qw-task/src/main/java/com/fs/app/task/UserCourseWatchCountTask.java

@@ -3,26 +3,35 @@ package com.fs.app.task;
 import com.fs.store.service.IFsUserCourseCountService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * 会员看课统计定时任务。SaaS 模式下按租户执行。
+ */
 @Component
 @Slf4j
 public class UserCourseWatchCountTask {
+
     @Autowired
     private IFsUserCourseCountService userCourseCountService;
+    @Resource
+    private TenantTaskRunner tenantTaskRunner;
 
-    private final AtomicBoolean isRunning1 = new AtomicBoolean(false);
+    @Value("${saas.task.enabled:false}")
+    private boolean saasTaskEnabled;
 
+    private final AtomicBoolean isRunning1 = new AtomicBoolean(false);
 
     /**
-     * 每15分钟执行一次
+     * 会员看课统计:每20分钟执行一次。SaaS 开启时按租户遍历执行。
      */
-    @Scheduled(cron = "0 */20 * * * ?")  // 每10分钟执行一次
+    @Scheduled(cron = "0 */20 * * * ?")
     public void userCourseCountTask() {
-        // 尝试设置标志为 true,表示任务开始执行
         if (!isRunning1.compareAndSet(false, true)) {
             log.warn("会员看课统计任务执行 - 上一个任务尚未完成,跳过此次执行");
             return;
@@ -30,20 +39,17 @@ public class UserCourseWatchCountTask {
         try {
             log.info("==============会员看课统计任务执行===============开始");
             long startTime = System.currentTimeMillis();
-
-            userCourseCountService.insertFsUserCourseCountTask();
-
+            if (saasTaskEnabled) {
+                tenantTaskRunner.runForEachTenant(() -> userCourseCountService.insertFsUserCourseCountTask());
+            } else {
+                userCourseCountService.insertFsUserCourseCountTask();
+            }
             log.info("会员看课统计任务执行==============结束");
-            long endTime = System.currentTimeMillis();
-            log.info("会员看课统计任务执行----------执行时长:{}", (endTime - startTime));
+            log.info("会员看课统计任务执行----------执行时长:{}", System.currentTimeMillis() - startTime);
         } catch (Exception e) {
             log.error("会员看课统计任务执行----------定时任务执行失败", e);
         } finally {
-            // 重置标志为 false,表示任务已完成
             isRunning1.set(false);
         }
-
     }
-
-
 }

+ 18 - 5
fs-qw-task/src/main/java/com/fs/app/taskService/impl/QwExternalContactRatingMoreSevenDaysServiceImpl.java

@@ -1,6 +1,7 @@
 package com.fs.app.taskService.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.fs.app.task.TenantTaskRunner;
 import com.fs.app.taskService.QwExternalContactRatingMoreSevenDaysService;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
@@ -20,9 +21,12 @@ import com.fs.voice.utils.StringUtil;
 import com.google.common.util.concurrent.AtomicDouble;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
+
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
@@ -65,6 +69,11 @@ public class QwExternalContactRatingMoreSevenDaysServiceImpl implements QwExtern
     @Autowired
     private ExecutorService sopRatingExecutor;  // 自定义线程池
 
+    @Resource
+    private TenantTaskRunner tenantTaskRunner;
+    @Value("${saas.task.enabled:false}")
+    private boolean saasTaskEnabled;
+
     // 任务队列
     private final BlockingQueue<SopUserLogs> taskQueue = new LinkedBlockingQueue<>(10000);
 
@@ -293,13 +302,18 @@ public class QwExternalContactRatingMoreSevenDaysServiceImpl implements QwExtern
     }
 
 
-    /**
-     * 每6小时更新一次
-     */
+    /** 每6小时刷新评级配置。SaaS 开启时按租户执行。 */
     @Scheduled(cron = "0 50 0/6 * * ?")
     public void refreshRatingConfig() {
+        if (saasTaskEnabled) {
+            tenantTaskRunner.runForEachTenant(this::doRefreshRatingConfig);
+        } else {
+            doRefreshRatingConfig();
+        }
+    }
 
-        synchronized(configLock) {
+    private void doRefreshRatingConfig() {
+        synchronized (configLock) {
             try {
                 String json = configService.selectConfigByKey("qwRating:config");
                 QwRatingConfig config = JSON.parseObject(json, QwRatingConfig.class);
@@ -313,7 +327,6 @@ public class QwExternalContactRatingMoreSevenDaysServiceImpl implements QwExtern
                 log.error("Exception while refreshing course.config: {}", e.getMessage(), e);
             }
         }
-
     }
 
 

+ 17 - 5
fs-qw-task/src/main/java/com/fs/app/taskService/impl/QwExternalContactRatingServiceImpl.java

@@ -1,6 +1,7 @@
 package com.fs.app.taskService.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.fs.app.task.TenantTaskRunner;
 import com.fs.app.taskService.QwExternalContactRatingService;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
@@ -20,11 +21,13 @@ import com.fs.voice.utils.StringUtil;
 import com.google.common.util.concurrent.AtomicDouble;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.time.LocalTime;
@@ -68,6 +71,11 @@ public class QwExternalContactRatingServiceImpl implements QwExternalContactRati
     @Autowired
     private ExecutorService sopRatingExecutor;  // 自定义线程池
 
+    @Resource
+    private TenantTaskRunner tenantTaskRunner;
+    @Value("${saas.task.enabled:false}")
+    private boolean saasTaskEnabled;
+
     // 任务队列
     private final BlockingQueue<SopUserLogs> taskQueue = new LinkedBlockingQueue<>(10000);
     private volatile boolean running = true;
@@ -296,13 +304,18 @@ public class QwExternalContactRatingServiceImpl implements QwExternalContactRati
     }
 
 
-    /**
-    * 每6小时更新一次
-    */
+    /** 每6小时刷新评级配置。SaaS 开启时按租户执行。 */
     @Scheduled(cron = "0 50 0/6 * * ?")
     public void refreshRatingConfig() {
+        if (saasTaskEnabled) {
+            tenantTaskRunner.runForEachTenant(this::doRefreshRatingConfig);
+        } else {
+            doRefreshRatingConfig();
+        }
+    }
 
-        synchronized(configLock) {
+    private void doRefreshRatingConfig() {
+        synchronized (configLock) {
             try {
                 String json = configService.selectConfigByKey("qwRating:config");
                 QwRatingConfig config = JSON.parseObject(json, QwRatingConfig.class);
@@ -316,7 +329,6 @@ public class QwExternalContactRatingServiceImpl implements QwExternalContactRati
                 log.error("Exception while refreshing course.config: {}", e.getMessage(), e);
             }
         }
-
     }
 
 

+ 19 - 3
fs-qw-task/src/main/java/com/fs/app/taskService/impl/SopLogsTaskServiceImpl.java

@@ -4,6 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fs.app.task.TenantTaskRunner;
 import com.fs.app.taskService.SopLogsTaskService;
 import com.fs.common.config.FSSysConfig;
 import com.fs.common.utils.PubFun;
@@ -46,6 +47,7 @@ import com.fs.voice.utils.StringUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.retry.annotation.Backoff;
 import org.springframework.retry.annotation.Retryable;
 import org.springframework.scheduling.annotation.Async;
@@ -54,6 +56,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import javax.annotation.PreDestroy;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -184,6 +187,11 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
     @Autowired
     private IFsUserCompanyBindService fsUserCompanyBindService;
 
+    @Resource
+    private TenantTaskRunner tenantTaskRunner;
+    @Value("${saas.task.enabled:false}")
+    private boolean saasTaskEnabled;
+
 
     @Autowired
     private IQwSopTempVoiceService sopTempVoiceService;
@@ -250,10 +258,18 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         zmLiveWatchLogExecutor.submit(this::consumeZmLiveWatchQueue);
     }
 
-    // Scheduled tasks to refresh configurations and domain names periodically
-    @Scheduled(fixedDelay = 60000) // 每60秒刷新一次
+    /** 每60秒刷新课程配置。SaaS 开启时按租户执行。 */
+    @Scheduled(fixedDelay = 60000)
     public void refreshCourseConfig() {
-        synchronized(configLock) {
+        if (saasTaskEnabled) {
+            tenantTaskRunner.runForEachTenant(this::doRefreshCourseConfig);
+        } else {
+            doRefreshCourseConfig();
+        }
+    }
+
+    private void doRefreshCourseConfig() {
+        synchronized (configLock) {
             try {
                 String json = configService.selectConfigByKey("course.config");
                 CourseConfig newConfig = JSON.parseObject(json, CourseConfig.class);

+ 5 - 2
fs-qw-task/src/main/resources/application.yml

@@ -12,7 +12,10 @@ spring:
 #    active: druid-hdt
     active: druid-myhk-test
 
-# SaaS 模式:为 true 时各 @Scheduled 定时任务按租户遍历执行(主库需有 tenant_info,各租户独立库)
+# SaaS 定时任务(主库需有 tenant_info,各租户独立库)
 # saas:
 #   task:
-#     enabled: true
+#     enabled: true          # 为 true 时各 @Scheduled 按租户执行
+#     parallel: false        # 为 true 时按租户并行执行(可提升多租户时整体效率)
+#     parallel:
+#       threads: 4           # 并行时线程池大小,建议不大于租户数并考虑连接池与负载