|
|
@@ -0,0 +1,264 @@
|
|
|
+package com.fs.app.task;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.fs.enums.EnumServerType;
|
|
|
+import com.fs.framework.datasource.DynamicDataSourceContextHolder;
|
|
|
+import com.fs.framework.datasource.TenantDataSourceManager;
|
|
|
+import com.fs.common.config.RedisTenantContext;
|
|
|
+import com.fs.common.enums.DataSourceType;
|
|
|
+import com.fs.common.utils.StringUtils;
|
|
|
+import com.fs.config.saas.ProjectConfig;
|
|
|
+import com.fs.core.config.TenantConfigContext;
|
|
|
+import com.fs.system.domain.SysConfig;
|
|
|
+import com.fs.system.mapper.SysConfigMapper;
|
|
|
+import com.fs.tenant.domain.TenantInfo;
|
|
|
+import com.fs.tenant.service.TenantInfoService;
|
|
|
+import com.fs.wxcid.domain.TenantServiceConfig;
|
|
|
+import com.fs.wxcid.mapper.TenantServiceConfigMapper;
|
|
|
+import com.fs.wxcid.utils.TenantHelper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.function.Consumer;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * SaaS 模式下按租户执行定时任务:从主库查启用租户,逐租户切库并设置租户配置后执行传入的逻辑。
|
|
|
+ * 支持顺序执行与可配置的按租户并行执行,租户多、任务多时可通过并行提升整体执行效率。
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class TenantTaskRunner {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TenantDataSourceManager tenantDataSourceManager;
|
|
|
+ @Resource
|
|
|
+ private TenantInfoService tenantInfoService;
|
|
|
+ @Resource
|
|
|
+ private SysConfigMapper sysConfigMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ TenantServiceConfigMapper tenantServiceConfigMapper;
|
|
|
+
|
|
|
+ /** 为 true 时按租户并行执行(线程池);为 false 时顺序执行。默认 false 保持与原有行为一致。 */
|
|
|
+ @Value("${saas.task.parallel:true}")
|
|
|
+ private boolean parallelEnabled;
|
|
|
+ /** 并行时线程池大小,建议不大于租户数量且考虑数据源连接池与机器负载。 */
|
|
|
+ @Value("${saas.task.parallel.threads:4}")
|
|
|
+ private int parallelThreads;
|
|
|
+
|
|
|
+ private ExecutorService tenantExecutor;
|
|
|
+ @Value("${tenant-service-marker:'cidWorkflow'}")
|
|
|
+ private String tenantServiceMarker;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void initExecutor() {
|
|
|
+ if (parallelEnabled && parallelThreads > 0) {
|
|
|
+ tenantExecutor = new ThreadPoolExecutor(
|
|
|
+ Math.min(4, parallelThreads),
|
|
|
+ parallelThreads,
|
|
|
+ 60L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(512),
|
|
|
+ r -> {
|
|
|
+ Thread t = new Thread(r, "saas-tenant-task-" + System.identityHashCode(r));
|
|
|
+ t.setDaemon(false);
|
|
|
+ return t;
|
|
|
+ },
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
+ );
|
|
|
+ log.info("[SaaS Task] 按租户并行已开启,线程池大小: {}", parallelThreads);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void shutdownExecutor() {
|
|
|
+ if (tenantExecutor != null) {
|
|
|
+ tenantExecutor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!tenantExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
+ tenantExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ tenantExecutor.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对每个启用且未过期的租户执行一次 action(已切到该租户库并设置 TenantConfigContext)。
|
|
|
+ * 根据配置为顺序执行或按租户并行执行。
|
|
|
+ */
|
|
|
+ public void runForEachTenant(Consumer<TenantInfo> action) {
|
|
|
+ runForEachTenant(null, action);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对每个启用且未过期的租户执行一次 action,日志中输出任务名。
|
|
|
+ * @param taskName 定时任务名称,会输出在「定时任务切换数据源...」后面
|
|
|
+ */
|
|
|
+ public void runForEachTenant(String taskName, Consumer<TenantInfo> action) {
|
|
|
+ List<TenantInfo> validTenants = getValidTenants();
|
|
|
+ if (validTenants == null || validTenants.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (parallelEnabled && tenantExecutor != null) {
|
|
|
+ runForEachTenantParallel(validTenants, taskName, action);
|
|
|
+ } else {
|
|
|
+ runForEachTenantSequential(validTenants, taskName, action);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对每个租户执行无参逻辑(不需要 TenantInfo 时使用)。
|
|
|
+ */
|
|
|
+ public void runForEachTenant(Runnable action) {
|
|
|
+ runForEachTenant(null, t -> action.run());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对每个租户执行无参逻辑,日志中输出任务名。
|
|
|
+ * @param taskName 定时任务名称,会输出在「定时任务切换数据源...」后面
|
|
|
+ */
|
|
|
+ public void runForEachTenant(String taskName, Runnable action) {
|
|
|
+ runForEachTenant(taskName, t -> action.run());
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<TenantInfo> getValidTenants() {
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ TenantInfo query = new TenantInfo();
|
|
|
+ query.setStatus(1);
|
|
|
+ List<TenantInfo> tenants = tenantInfoService.selectTenantInfoList(query);
|
|
|
+ if (tenants == null || tenants.isEmpty()) {
|
|
|
+ log.debug("[SaaS Task] 无启用租户,跳过");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ Date now = new Date();
|
|
|
+ return tenants.stream()
|
|
|
+ .filter(t -> t.getExpireTime() == null || !t.getExpireTime().before(now))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void runForEachTenantSequential(List<TenantInfo> validTenants, String taskName, Consumer<TenantInfo> action) {
|
|
|
+ for (TenantInfo tenant : validTenants) {
|
|
|
+ runForOneTenant(tenant, taskName, action);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void runForEachTenantParallel(List<TenantInfo> validTenants, String taskName, Consumer<TenantInfo> action) {
|
|
|
+ CountDownLatch latch = new CountDownLatch(validTenants.size());
|
|
|
+ for (TenantInfo tenant : validTenants) {
|
|
|
+ tenantExecutor.submit(() -> {
|
|
|
+ try {
|
|
|
+ runForOneTenant(tenant, taskName, action);
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (!latch.await(30, TimeUnit.MINUTES)) {
|
|
|
+ log.warn("[SaaS Task] 按租户并行执行超时(30分钟)");
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("[SaaS Task] 按租户并行执行被中断", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void runForOneTenant(TenantInfo tenant, String taskName, Consumer<TenantInfo> action) {
|
|
|
+ String dataSourceKey = "tenant:" + tenant.getId();
|
|
|
+ try {
|
|
|
+ // 切换到租户数据源
|
|
|
+ tenantDataSourceManager.switchTenant(tenant);
|
|
|
+ // 切换Redis租户上下文
|
|
|
+ RedisTenantContext.setTenantId(tenant.getId());
|
|
|
+ TenantHelper.setTenantId(tenant.getId());
|
|
|
+ log.info("[SaaS Task] 定时任务切换数据源和Redis dataSource={}, tenantId={}, tenantCode={}, task={}",
|
|
|
+ dataSourceKey, tenant.getId(), tenant.getTenantCode(), taskName != null ? taskName : "");
|
|
|
+
|
|
|
+ // 加载租户项目配置
|
|
|
+ SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
|
|
|
+ if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
|
|
|
+ TenantConfigContext.set(JSONObject.parseObject(cfg.getConfigValue()));
|
|
|
+ } else {
|
|
|
+ TenantConfigContext.set(null);
|
|
|
+ }
|
|
|
+ ProjectConfig.loadTenantConfigsFromContext();
|
|
|
+
|
|
|
+ // 执行租户任务
|
|
|
+ action.accept(tenant);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[SaaS Task] 租户 tenantId={}, tenantCode={} 执行异常, task={}",
|
|
|
+ tenant.getId(), tenant.getTenantCode(), taskName, e);
|
|
|
+ } finally {
|
|
|
+ ProjectConfig.clearTenantConfigs();
|
|
|
+ TenantConfigContext.clear();
|
|
|
+ // 清理Redis租户上下文
|
|
|
+ RedisTenantContext.clear();
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ TenantHelper.removeTenantId();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对服务负责范围内的租户执行无参逻辑,日志中输出任务名。
|
|
|
+ * @param taskName 定时任务名称,会输出在「定时任务切换数据源...」后面
|
|
|
+ */
|
|
|
+ public void runForResponsibleTenant(String taskName, Runnable action) {
|
|
|
+ TenantServiceConfig tenantServiceConfig = tenantServiceConfigMapper.selectTenantServiceConfigByServiceMarkerAndType(tenantServiceMarker, EnumServerType.CIDWORKFLOW.getValue());
|
|
|
+ if(null != tenantServiceConfig) {
|
|
|
+ String tenantIds = tenantServiceConfig.getTenantIds();
|
|
|
+ if (StringUtils.isNotBlank(tenantIds)) {
|
|
|
+ List<Long> tenatIdList = Arrays.stream((tenantIds.split(","))).map(item -> Long.valueOf(item.trim())).collect(Collectors.toList());
|
|
|
+ List<TenantInfo> responsibleValidTenants = getResponsibleValidTenants(tenatIdList);
|
|
|
+ if (responsibleValidTenants == null || responsibleValidTenants.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (parallelEnabled && tenantExecutor != null) {
|
|
|
+ runForEachTenantParallel(responsibleValidTenants, taskName, t -> action.run());
|
|
|
+ } else {
|
|
|
+ runForEachTenantSequential(responsibleValidTenants, taskName, t -> action.run());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取服务负责范围内的租户
|
|
|
+ * @param tenantIds
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private List<TenantInfo> getResponsibleValidTenants(List<Long> tenantIds) {
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ TenantInfo query = new TenantInfo();
|
|
|
+ query.setStatus(1);
|
|
|
+ List<TenantInfo> tenants = tenantInfoService.selectTenantInfoList(query);
|
|
|
+ if (tenants == null || tenants.isEmpty()) {
|
|
|
+ log.debug("[SaaS Task] 无启用租户,跳过");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ Date now = new Date();
|
|
|
+ return tenants.stream()
|
|
|
+ .filter(t -> (t.getExpireTime() == null || !t.getExpireTime().before(now) ) && tenantIds.contains(t.getId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|