|
|
@@ -0,0 +1,233 @@
|
|
|
+package com.fs.quartz.util;
|
|
|
+
|
|
|
+import com.fs.common.config.RedisTenantContext;
|
|
|
+import com.fs.common.constant.Constants;
|
|
|
+import com.fs.common.constant.ScheduleConstants;
|
|
|
+import com.fs.common.enums.DataSourceType;
|
|
|
+import com.fs.common.utils.ExceptionUtil;
|
|
|
+import com.fs.common.utils.StringUtils;
|
|
|
+import com.fs.config.saas.ProjectConfig;
|
|
|
+import com.fs.core.config.TenantConfigContext;
|
|
|
+import com.fs.framework.datasource.DynamicDataSourceContextHolder;
|
|
|
+import com.fs.framework.datasource.TenantDataSourceManager;
|
|
|
+import com.fs.quartz.domain.SysJob;
|
|
|
+import com.fs.quartz.domain.SysJobLog;
|
|
|
+import com.fs.quartz.domain.TenantJobConfig;
|
|
|
+import com.fs.quartz.mapper.SysJobTemplateMapper;
|
|
|
+import com.fs.quartz.mapper.TenantJobConfigMapper;
|
|
|
+import com.fs.quartz.service.ISysJobLogService;
|
|
|
+import com.fs.system.domain.SysConfig;
|
|
|
+import com.fs.system.mapper.SysConfigMapper;
|
|
|
+import com.fs.tenant.domain.TenantInfo;
|
|
|
+import com.fs.tenant.service.TenantInfoService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.quartz.JobExecutionContext;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class MultiScopeJobDispatcher {
|
|
|
+ @Autowired private SysJobTemplateMapper sysJobTemplateMapper;
|
|
|
+ @Autowired private TenantJobConfigMapper tenantJobConfigMapper;
|
|
|
+ @Autowired private TenantInfoService tenantInfoService;
|
|
|
+ @Autowired private TenantDataSourceManager tenantDataSourceManager;
|
|
|
+ @Autowired private SysConfigMapper sysConfigMapper;
|
|
|
+ @Autowired(required = false) private ISysJobLogService sysJobLogService;
|
|
|
+ @Value("${saas.task.parallel.threads:4}") private int parallelThreads;
|
|
|
+ private volatile ExecutorService tenantExecutor;
|
|
|
+
|
|
|
+ public void dispatch(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
|
|
|
+ if (!JobInvokeUtil.isInvokeTargetAvailable(sysJob)) {
|
|
|
+ log.warn("[MultiScopeJob] invokeTarget 在当前进程不可用,跳过执行: jobName={}, invokeTarget={}, jobGroup={}",
|
|
|
+ sysJob.getJobName(), sysJob.getInvokeTarget(), sysJob.getJobGroup());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String scope = resolveScope(sysJob);
|
|
|
+ if (ScheduleConstants.JobScope.TENANT.getValue().equals(scope)) {
|
|
|
+ log.info("[MultiScopeJob] 租户级任务: jobName={}, templateId={}", sysJob.getJobName(), sysJob.getJobId());
|
|
|
+ executeTenantJob(context, sysJob, executor);
|
|
|
+ } else {
|
|
|
+ log.debug("[MultiScopeJob] 平台级任务: jobName={}", sysJob.getJobName());
|
|
|
+ executePlatformJob(context, sysJob, executor);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String resolveScope(SysJob sysJob) {
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ Long templateId = resolveTemplateId(sysJob);
|
|
|
+ if (templateId != null) {
|
|
|
+ String scope = sysJobTemplateMapper.selectScopeByTemplateId(templateId);
|
|
|
+ if (StringUtils.isNotEmpty(scope)) {
|
|
|
+ return scope;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ScheduleConstants.JobScope.PLATFORM.getValue();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("[MultiScopeJob] 查询模板 scope 失败,按平台级处理: jobName={}", sysJob.getJobName(), e);
|
|
|
+ return ScheduleConstants.JobScope.PLATFORM.getValue();
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Long resolveTemplateId(SysJob sysJob) {
|
|
|
+ if (sysJob.getJobId() != null) {
|
|
|
+ String scope = sysJobTemplateMapper.selectScopeByTemplateId(sysJob.getJobId());
|
|
|
+ if (StringUtils.isNotEmpty(scope)) {
|
|
|
+ return sysJob.getJobId();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return sysJobTemplateMapper.selectTemplateIdByJob(sysJob.getJobGroup(), sysJob.getInvokeTarget());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void executePlatformJob(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ executor.execute(context, sysJob);
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void executeTenantJob(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
|
|
|
+ List<TenantJobConfig> tenantConfigs = queryAssignedTenants(sysJob);
|
|
|
+ if (tenantConfigs.isEmpty()) {
|
|
|
+ log.warn("[MultiScopeJob] 租户级任务未分配任何租户: jobName={}, templateId={}",
|
|
|
+ sysJob.getJobName(), sysJob.getJobId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("[MultiScopeJob] 开始并行执行租户任务: jobName={}, templateId={}, 租户数={}, tenantIds={}",
|
|
|
+ sysJob.getJobName(), sysJob.getJobId(), tenantConfigs.size(),
|
|
|
+ tenantConfigs.stream().map(TenantJobConfig::getTenantId).collect(Collectors.toList()));
|
|
|
+ CountDownLatch latch = new CountDownLatch(tenantConfigs.size());
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
+ for (TenantJobConfig config : tenantConfigs) {
|
|
|
+ getTenantExecutor().submit(() -> {
|
|
|
+ if (executeForOneTenant(context, sysJob, config, executor)) failCount.incrementAndGet();
|
|
|
+ latch.countDown();
|
|
|
+ });
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (!latch.await(30, TimeUnit.MINUTES)) {
|
|
|
+ throw new IllegalStateException("租户任务执行超时: " + sysJob.getJobName());
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IllegalStateException("租户任务执行被中断: " + sysJob.getJobName(), e);
|
|
|
+ }
|
|
|
+ if (failCount.get() > 0) {
|
|
|
+ throw new IllegalStateException("租户任务部分失败: jobName=" + sysJob.getJobId() + ", 失败数=" + failCount.get());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<TenantJobConfig> queryAssignedTenants(SysJob sysJob) {
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ Long templateId = resolveTemplateId(sysJob);
|
|
|
+ if (templateId == null) {
|
|
|
+ log.warn("[MultiScopeJob] 无法解析 templateId,跳过租户分发: jobName={}, jobId={}",
|
|
|
+ sysJob.getJobName(), sysJob.getJobId());
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ List<TenantJobConfig> list = tenantJobConfigMapper.selectTenantsByTemplateId(templateId);
|
|
|
+ return list != null ? list : Collections.emptyList();
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean executeForOneTenant(JobExecutionContext context, SysJob sysJob, TenantJobConfig config, ScopedJobExecutor executor) {
|
|
|
+ Long tenantId = config.getTenantId();
|
|
|
+ String tenantCode = StringUtils.isNotEmpty(config.getTenantCode()) ? config.getTenantCode() : String.valueOf(tenantId);
|
|
|
+ Date startTime = new Date();
|
|
|
+ Exception executionError = null;
|
|
|
+ boolean tenantSwitched = false;
|
|
|
+ try {
|
|
|
+ TenantInfo tenantInfo = loadTenantInfo(tenantId);
|
|
|
+ if (tenantInfo == null) {
|
|
|
+ log.error("[MultiScopeJob] 租户不存在: tenantId={}", tenantId);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ switchToTenant(tenantInfo);
|
|
|
+ tenantSwitched = true;
|
|
|
+ log.info("[MultiScopeJob] 租户开始执行: jobName={}, tenantId={}", sysJob.getJobName(), tenantId);
|
|
|
+ executor.execute(context, sysJob);
|
|
|
+ return false;
|
|
|
+ } catch (Exception e) {
|
|
|
+ executionError = e;
|
|
|
+ log.error("[MultiScopeJob] 租户执行失败: tenantId={}", tenantId, e);
|
|
|
+ return true;
|
|
|
+ } finally {
|
|
|
+ if (tenantSwitched) {
|
|
|
+ try { saveJobLog(sysJob, tenantCode, startTime, executionError, true); } catch (Exception ignored) { }
|
|
|
+ }
|
|
|
+ cleanup();
|
|
|
+ try { saveJobLog(sysJob, tenantCode, startTime, executionError, false); } catch (Exception ignored) { }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private TenantInfo loadTenantInfo(Long tenantId) {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ return tenantInfoService.getById(tenantId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void switchToTenant(TenantInfo tenantInfo) {
|
|
|
+ tenantDataSourceManager.switchTenant(tenantInfo);
|
|
|
+ RedisTenantContext.setTenantId(tenantInfo.getId());
|
|
|
+ SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
|
|
|
+ ProjectConfig.safeLoadTenantConfigFromValue(cfg != null ? cfg.getConfigValue() : null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveJobLog(SysJob sysJob, String tenantCode, Date startTime, Exception e, boolean inTenantDb) {
|
|
|
+ if (sysJobLogService == null) return;
|
|
|
+ if (!inTenantDb) DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
|
|
|
+ SysJobLog jobLog = new SysJobLog();
|
|
|
+ jobLog.setJobName(sysJob.getJobName());
|
|
|
+ jobLog.setJobGroup(sysJob.getJobGroup());
|
|
|
+ jobLog.setInvokeTarget(sysJob.getInvokeTarget());
|
|
|
+ jobLog.setStartTime(startTime);
|
|
|
+ jobLog.setStopTime(new Date());
|
|
|
+ long runMs = jobLog.getStopTime().getTime() - startTime.getTime();
|
|
|
+ String dbTag = inTenantDb ? "租户库" : "主库";
|
|
|
+ jobLog.setJobMessage(sysJob.getJobName() + " [" + tenantCode + "] " + dbTag + " 耗时:" + runMs + "毫秒");
|
|
|
+ jobLog.setStatus(e != null ? Constants.FAIL : Constants.SUCCESS);
|
|
|
+ if (e != null) jobLog.setExceptionInfo(StringUtils.substring(ExceptionUtil.getExceptionMessage(e), 0, 2000));
|
|
|
+ sysJobLogService.addJobLog(jobLog);
|
|
|
+ }
|
|
|
+
|
|
|
+ private ExecutorService getTenantExecutor() {
|
|
|
+ if (tenantExecutor == null) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (tenantExecutor == null) {
|
|
|
+ int threads = Math.max(2, parallelThreads);
|
|
|
+ tenantExecutor = new ThreadPoolExecutor(threads, threads, 60L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(256), r -> new Thread(r, "tenant-job-" + System.identityHashCode(r)),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return tenantExecutor;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cleanup() {
|
|
|
+ try { ProjectConfig.clearTenantConfigs(); } catch (Exception ignored) { }
|
|
|
+ try { TenantConfigContext.clear(); } catch (Exception ignored) { }
|
|
|
+ try { RedisTenantContext.clear(); } catch (Exception ignored) { }
|
|
|
+ try { DynamicDataSourceContextHolder.clearDataSourceType(); } catch (Exception ignored) { }
|
|
|
+ try { tenantDataSourceManager.clear(); } catch (Exception ignored) { }
|
|
|
+ }
|
|
|
+}
|