lmx 6 hari lalu
induk
melakukan
b92998ffca

+ 66 - 38
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -10,8 +10,11 @@ import com.fs.company.param.ExecutionContext;
 import com.fs.company.param.PauseRoboticActiveParam;
 import com.fs.company.service.*;
 import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.wxcid.utils.TenantHelper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
@@ -40,53 +43,78 @@ public class CallTaskService {
     @Qualifier("cidWorkFlowExecutor")
     private Executor cidWorkFlowExecutor;
 
+    @Autowired
+    private RedissonClient redissonClient;
+
+    /** 外呼延时扫描分布式锁 key 前缀,按 租户id:分组 隔离,避免多实例/重入重复扫描同批延时key */
+    private static final String CALL_DELAY_LOCK_PREFIX = "cid_workflow:call_delay_lock:";
+
 
     /**
      * 扫描工作流延时任务
      */
     public void cidWorkflowCallRun() {
-        log.info("===========工作流延时任务开始扫描===========");
-        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(cidGroupNo,null) + "*";
-        Collection<String> keys = redisCache2.keys(delayCallKeyPrefix);
-        log.info("共扫描到 {} 个待处理键", keys.size());
-        // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
-        Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
-        keys.parallelStream().forEach(key -> {
-            try {
-                //doExec
-                CompletableFuture.runAsync(() -> {
-                    try {
-                        ExecutionContext context = redisCache2.getCacheObject(key);
-                        if (context == null) {
-                            log.warn("工作流延时任务context为空,跳过 - key: {}", key);
-                            redisCache2.deleteObject(key);
-                            return;
-                        }
-                        // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
-                        Long taskId = context.getVariable("roboticId", Long.class);
-                        if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
-                            // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
-                            // 同步context信息到DB exec,供恢复时resumePausedInstances使用
+        // 分布式锁:按 租户id:分组 隔离,拿不到锁说明已有实例在扫描,直接跳过本轮(延时key下轮仍可扫到,不丢失)
+        String lockKey = CALL_DELAY_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
+        RLock lock = redissonClient.getLock(lockKey);
+        boolean locked = false;
+        try {
+            // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
+            locked = lock.tryLock(0, -1, TimeUnit.SECONDS);
+            if (!locked) {
+                log.info("cidWorkflowCallRun 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
+                return;
+            }
+            log.info("===========工作流延时任务开始扫描===========");
+            String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(cidGroupNo,null) + "*";
+            Collection<String> keys = redisCache2.keys(delayCallKeyPrefix);
+            log.info("共扫描到 {} 个待处理键", keys.size());
+            // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
+            Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
+            keys.parallelStream().forEach(key -> {
+                try {
+                    //doExec
+                    CompletableFuture.runAsync(() -> {
+                        try {
+                            ExecutionContext context = redisCache2.getCacheObject(key);
+                            if (context == null) {
+                                log.warn("工作流延时任务context为空,跳过 - key: {}", key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
+                            Long taskId = context.getVariable("roboticId", Long.class);
+                            if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
+                                // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
+                                // 同步context信息到DB exec,供恢复时resumePausedInstances使用
+                                context.setVariable("callSource", "callTaskTimer");
+                                context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
+                                companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
+                                log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            context.setVariable("callRedisKey", key);
                             context.setVariable("callSource", "callTaskTimer");
-                            context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
-                            companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
-                            log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+                            companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context.getVariables());
                             redisCache2.deleteObject(key);
-                            return;
+                        } catch (Exception e) {
+                            log.error("处理工作流延时任务异常 - key: {}", key, e);
                         }
-                        context.setVariable("callRedisKey", key);
-                        context.setVariable("callSource", "callTaskTimer");
-                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context.getVariables());
-                        redisCache2.deleteObject(key);
-                    } catch (Exception e) {
-                        log.error("处理工作流延时任务异常 - key: {}", key, e);
-                    }
-                }, cidWorkFlowExecutor);
-            } catch (Exception ex) {
-                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                    }, cidWorkFlowExecutor);
+                } catch (Exception ex) {
+                    log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                }
+            });
+            log.info("===========工作流延时任务扫描结束===========");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("cidWorkflowCallRun 获取分布式锁被中断 - lockKey: {}", lockKey, e);
+        } finally {
+            if (locked && lock.isHeldByCurrentThread()) {
+                lock.unlock();
             }
-        });
-        log.info("===========工作流延时任务扫描结束===========");
+        }
     }
 
     /**

+ 126 - 41
fs-cid-workflow/src/main/java/com/fs/app/service/CidWorkflowTaskService.java

@@ -15,6 +15,8 @@ import com.fs.enums.NodeTypeEnum;
 import com.fs.wxcid.utils.TenantHelper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
@@ -29,6 +31,11 @@ public class CidWorkflowTaskService {
 
     @Value("${cid-group-no:0}")
     Integer cidGroupNo;
+
+    /** 认领态(RUNNING)超时分钟数,超过该时间未流转的任务视为卡死并回扫重置为失败 */
+    @Value("${cid-workflow.running-timeout-minutes:30}")
+    Integer runningTimeoutMinutes;
+
     private final CompanyAiWorkflowExecMapper companyAiWorkflowExecMapper;
     private final CompanyWorkflowEngine companyWorkflowEngine;
     private final ICompanyVoiceRoboticService companyVoiceRoboticService;
@@ -43,34 +50,69 @@ public class CidWorkflowTaskService {
     @Autowired
     private WorkflowNodeFactory nodeFactory;
 
+    @Autowired
+    private RedissonClient redissonClient;
+
+    /** 扫描分布式锁 key 前缀,按 租户id:分组 隔离,保证同一批数据同一时刻仅一个实例扫描 */
+    private static final String SCAN_LOCK_PREFIX = "cid_workflow:scan_lock:";
+
+    /** 激活可执行任务的分布式锁 key 前缀,与扫描就绪任务的锁独立,避免两个定时任务互相阻塞 */
+    private static final String ACTIVATE_LOCK_PREFIX = "cid_workflow:activate_lock:";
+
     /**
      * 扫描当前分组下就绪任务,并开启执行
      */
     public void runCidWorkflow() {
-        List<CompanyAiWorkflowExec> companyAiWorkflowExecs = companyAiWorkflowExecMapper.selectExecListWithTimeAvailableByStatusAndGroupNo(ExecutionStatusEnum.READY.getValue(), cidGroupNo);
-        System.out.println(companyAiWorkflowExecs);
-        log.info("runCidWorkflow得到租户id:{}",TenantHelper.getTenantId());
-        if (null != companyAiWorkflowExecs && companyAiWorkflowExecs.size() > 0) {
-            // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
-            Map<Long, Boolean> pausedCache = new HashMap<>();
-            companyAiWorkflowExecs.forEach(exec -> {
-//                cidWorkFlowExecutor.execute(() -> {
-                try {
-                    // 任务暂停守卫检查(从 variables JSON 中提取 roboticId,即 CompanyVoiceRobotic.id)
-                    Long taskId = extractRoboticIdFromExec(exec);
-                    if (taskId != null) {
-                        boolean paused = pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id));
-                        if (paused) {
-                            log.debug("任务已暂停,跳过执行 - taskId: {}, execId: {}", taskId, exec.getId());
+        // 分布式锁:按 租户id:分组 隔离,拿不到锁说明同批数据已有实例在扫描,直接跳过本轮(周期任务下轮会再扫,不积压)
+        String lockKey = SCAN_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
+        RLock lock = redissonClient.getLock(lockKey);
+        boolean locked = false;
+        try {
+            // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
+            locked = lock.tryLock(0, -1, TimeUnit.SECONDS);
+            if (!locked) {
+                log.info("runCidWorkflow 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
+                return;
+            }
+            List<CompanyAiWorkflowExec> companyAiWorkflowExecs = companyAiWorkflowExecMapper.selectExecListWithTimeAvailableByStatusAndGroupNo(ExecutionStatusEnum.READY.getValue(), cidGroupNo);
+            log.info("runCidWorkflow得到租户id:{}",TenantHelper.getTenantId());
+            if (null != companyAiWorkflowExecs && companyAiWorkflowExecs.size() > 0) {
+                // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
+                Map<Long, Boolean> pausedCache = new HashMap<>();
+                companyAiWorkflowExecs.forEach(exec -> {
+                    try {
+                        // 任务暂停守卫检查(从 variables JSON 中提取 roboticId,即 CompanyVoiceRobotic.id)
+                        Long taskId = extractRoboticIdFromExec(exec);
+                        if (taskId != null) {
+                            boolean paused = pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id));
+                            if (paused) {
+                                log.debug("任务已暂停,跳过执行 - taskId: {}, execId: {}", taskId, exec.getId());
+                                return;
+                            }
+                        }
+                        // 原子认领:READY -> RUNNING,仅认领成功(影响行数=1)才提交执行,
+                        // 防止上一轮异步任务尚未完成时下一次定时扫描重复调度同一条记录
+                        int claimed = companyAiWorkflowExecMapper.claimExecForRun(
+                                exec.getId(),
+                                ExecutionStatusEnum.READY.getValue(),
+                                ExecutionStatusEnum.RUNNING.getValue());
+                        if (claimed == 0) {
+                            log.debug("任务已被认领,跳过重复执行 - execId: {}", exec.getId());
                             return;
                         }
+                        companyWorkflowEngine.executeNode(exec.getWorkflowInstanceId(), exec.getCurrentNodeKey());
+                    } catch (Exception e) {
+                        log.error("处理就绪任务异常 - exec: {}", exec, e);
                     }
-                    companyWorkflowEngine.executeNode(exec.getWorkflowInstanceId(), exec.getCurrentNodeKey());
-                } catch (Exception e) {
-                    log.error("处理就绪任务异常 - exec: {}", exec, e);
-                }
-//                });
-            });
+                });
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("runCidWorkflow 获取分布式锁被中断 - lockKey: {}", lockKey, e);
+        } finally {
+            if (locked && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
         }
     }
 
@@ -78,29 +120,72 @@ public class CidWorkflowTaskService {
      * 扫描可执行任务,并激活执行
      */
     public void activateTimeAvailableTask() {
-        List<CompanyAiWorkflowExec> companyAiWorkflowExecs = companyAiWorkflowExecMapper.selectExecListWithTimeAvailableByStatusAndGroupNo(ExecutionStatusEnum.PENDING.getValue(), cidGroupNo);
-        log.info("activateTimeAvailableTask得到租户id:{}",TenantHelper.getTenantId());
-        if (null != companyAiWorkflowExecs && companyAiWorkflowExecs.size() > 0) {
-            // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
-            Map<Long, Boolean> pausedCache = new HashMap<>();
-            companyAiWorkflowExecs.forEach(exec -> {
-//                cidWorkFlowExecutor.execute(() -> {
-                try {
-                    // 任务暂停守卫检查(从 variables JSON 中提取 roboticId,即 CompanyVoiceRobotic.id)
-                    Long taskId = extractRoboticIdFromExec(exec);
-                    if (taskId != null) {
-                        boolean paused = pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id));
-                        if (paused) {
-                            log.debug("任务已暂停,跳过执行 - taskId: {}, execId: {}", taskId, exec.getId());
+        // 分布式锁:与 runCidWorkflow 锁独立,按 租户id:分组 隔离,拿不到锁说明已有实例在激活,直接跳过本轮
+        String lockKey = ACTIVATE_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
+        RLock lock = redissonClient.getLock(lockKey);
+        boolean locked = false;
+        try {
+            // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
+            locked = lock.tryLock(0, -1, TimeUnit.SECONDS);
+            if (!locked) {
+                log.info("activateTimeAvailableTask 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
+                return;
+            }
+            List<CompanyAiWorkflowExec> companyAiWorkflowExecs = companyAiWorkflowExecMapper.selectExecListWithTimeAvailableByStatusAndGroupNo(ExecutionStatusEnum.PENDING.getValue(), cidGroupNo);
+            log.info("activateTimeAvailableTask得到租户id:{}",TenantHelper.getTenantId());
+            if (null != companyAiWorkflowExecs && companyAiWorkflowExecs.size() > 0) {
+                // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
+                Map<Long, Boolean> pausedCache = new HashMap<>();
+                companyAiWorkflowExecs.forEach(exec -> {
+                    try {
+                        // 任务暂停守卫检查(从 variables JSON 中提取 roboticId,即 CompanyVoiceRobotic.id)
+                        Long taskId = extractRoboticIdFromExec(exec);
+                        if (taskId != null) {
+                            boolean paused = pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id));
+                            if (paused) {
+                                log.debug("任务已暂停,跳过执行 - taskId: {}, execId: {}", taskId, exec.getId());
+                                return;
+                            }
+                        }
+                        // 原子认领:PENDING -> RUNNING,仅认领成功(影响行数=1)才提交执行,
+                        // 防止上一轮异步任务尚未完成时下一次定时扫描重复激活同一条记录
+                        int claimed = companyAiWorkflowExecMapper.claimExecForRun(
+                                exec.getId(),
+                                ExecutionStatusEnum.PENDING.getValue(),
+                                ExecutionStatusEnum.RUNNING.getValue());
+                        if (claimed == 0) {
+                            log.debug("任务已被认领,跳过重复执行 - execId: {}", exec.getId());
                             return;
                         }
+                        companyWorkflowEngine.executeNode(exec.getWorkflowInstanceId(), exec.getCurrentNodeKey());
+                    } catch (Exception e) {
+                        log.error("处理就绪任务异常 - exec: {}", exec, e);
                     }
-                    companyWorkflowEngine.executeNode(exec.getWorkflowInstanceId(), exec.getCurrentNodeKey());
-                } catch (Exception e) {
-                    log.error("处理就绪任务异常 - exec: {}", exec, e);
-                }
-//                });
-            });
+                });
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("activateTimeAvailableTask 获取分布式锁被中断 - lockKey: {}", lockKey, e);
+        } finally {
+            if (locked && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * 回扫超时认领任务:将处于 RUNNING 态且超过 runningTimeoutMinutes 分钟未更新的记录重置为 FAILURE。
+     * 用于兜底进程重启 / 线程池拒绝导致任务永久卡在认领态(RUNNING)无法被再次扫描的情况。
+     */
+    public void resetTimeoutRunningTask() {
+        java.time.LocalDateTime timeoutTime = java.time.LocalDateTime.now().minusMinutes(runningTimeoutMinutes);
+        int reset = companyAiWorkflowExecMapper.resetTimeoutRunningExec(
+                cidGroupNo,
+                ExecutionStatusEnum.RUNNING.getValue(),
+                ExecutionStatusEnum.FAILURE.getValue(),
+                timeoutTime);
+        if (reset > 0) {
+            log.warn("回扫超时认领任务,重置为失败状态 - groupNo: {}, count: {}, timeoutMinutes: {}", cidGroupNo, reset, runningTimeoutMinutes);
         }
     }
 

+ 13 - 0
fs-cid-workflow/src/main/java/com/fs/app/task/CidTask.java

@@ -68,6 +68,19 @@ public class CidTask {
 //    }
 
 
+    /**
+     * 回扫超时认领任务 - 每5分钟执行一次
+     * 将卡在认领态(RUNNING)且长时间未流转的任务重置为失败,兜底进程重启/线程池拒绝导致的永久卡死
+     */
+    @Scheduled(cron = "0 0/5 * * * ?")
+    public void resetTimeoutRunningTask() {
+        if (saasTaskEnabled) {
+            tenantTaskRunner.runForResponsibleTenant("resetTimeoutRunningTask", () -> cidWorkflowTaskService.resetTimeoutRunningTask());
+        } else {
+            cidWorkflowTaskService.resetTimeoutRunningTask();
+        }
+    }
+
     /**
      * 外呼重试任务 - 每30分钟执行一次
      * 扫描 Redis 中被外呼限制拦截的待重试呼叫,到达 nextAvailableTime 后重新执行

+ 73 - 0
fs-company/src/main/java/com/fs/company/controller/crm/CrmCustomerController.java

@@ -5,12 +5,15 @@ import com.fs.common.annotation.Log;
 import com.fs.common.core.controller.BaseController;
 import com.fs.common.core.domain.AjaxResult;
 import com.fs.common.core.domain.R;
+import com.fs.common.core.domain.entity.SysDictData;
 import com.fs.common.core.page.TableDataInfo;
 import com.fs.common.enums.BusinessType;
 import com.fs.common.utils.ServletUtils;
 import com.fs.common.utils.StringUtils;
 import com.fs.common.utils.poi.ExcelUtil;
+import com.fs.company.domain.Company;
 import com.fs.company.domain.CompanyUser;
+import com.fs.company.service.ICompanyService;
 import com.fs.company.service.ICompanyUserService;
 import com.fs.company.util.OrderUtils;
 import com.fs.crm.domain.CrmCustomer;
@@ -22,6 +25,8 @@ import com.fs.crm.vo.*;
 import com.fs.framework.security.LoginUser;
 import com.fs.framework.service.TokenService;
 import com.fs.his.utils.PhoneUtil;
+import com.fs.system.service.ISysConfigService;
+import com.fs.system.service.ISysDictTypeService;
 import com.github.pagehelper.PageHelper;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -33,7 +38,9 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
 
 import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 客户Controller
@@ -56,6 +63,15 @@ public class CrmCustomerController extends BaseController
     @Autowired
     private ICrmCustomerPropertyService crmCustomerPropertyService;
 
+    @Autowired
+    private ICompanyService companyService;
+
+    @Autowired
+    private ISysConfigService configService;
+
+    @Autowired
+    private ISysDictTypeService dictTypeService;
+
     @ApiOperation("获取线索客户")
     @PreAuthorize("@ss.hasPermi('crm:customer:lineList')")
     @GetMapping("/getLineCustomerList")
@@ -507,4 +523,61 @@ public class CrmCustomerController extends BaseController
         return getDataTable(list);
     }
 
+    @ApiOperation("动态数据字典查询")
+    @GetMapping("/tradeDicts")
+    public AjaxResult getTradeDicts() {
+        // 1. 获取当前登录用户的公司ID
+        LoginUser loginUser = tokenService.getLoginUser(ServletUtils.getRequest());
+        Long companyId = loginUser.getCompany().getCompanyId();
+
+        // 2. 查询公司行业配置 (company_trade)
+        String prefix = "";
+        Company company = companyService.selectCompanyById(companyId);
+        Integer companyTrade = company.getCompanyTrade();
+
+        if (companyTrade != null) {
+            // 查 trade_type 字典获取 remark 前缀
+            prefix = getDictRemarkByValue("trade_type", String.valueOf(companyTrade));
+        }
+
+        // 3. 如果公司没有行业配置,回退查租户配置
+        if (StringUtils.isEmpty(prefix)) {
+            String tenantTrade = configService.selectConfigByKey("his.config.tenantTrade");
+            if (StringUtils.isNotEmpty(tenantTrade)) {
+                prefix = getDictRemarkByValue("trade_type", tenantTrade);
+            }
+        }
+
+        // 4. 构建四个字典的查询结果(带保底逻辑)
+        String[] dictTypes = {"crm_customer_source", "crm_customer_user_status",
+                "crm_customer_type", "crm_customer_tag"};
+        Map<String, List<SysDictData>> result = new HashMap<>();
+
+        for (String dictType : dictTypes) {
+            List<SysDictData> data = null;
+            if (StringUtils.isNotEmpty(prefix)) {
+                data = dictTypeService.selectDictDataByType(prefix + dictType);
+            }
+            // 保底逻辑:前缀查不到数据则使用原始字典
+            if (data == null || data.isEmpty()) {
+                data = dictTypeService.selectDictDataByType(dictType);
+            }
+            result.put(dictType, data);
+        }
+
+        return AjaxResult.success(result);
+    }
+
+    private String getDictRemarkByValue(String dictType, String dictValue) {
+        List<SysDictData> dictDataList = dictTypeService.selectDictDataByType(dictType);
+        if (dictDataList != null) {
+            for (SysDictData data : dictDataList) {
+                if (dictValue.equals(data.getDictValue())) {
+                    return StringUtils.isNotEmpty(data.getRemark()) ? data.getRemark() : "";
+                }
+            }
+        }
+        return "";
+    }
+
 }

+ 6 - 0
fs-service/src/main/java/com/fs/company/domain/Company.java

@@ -174,4 +174,10 @@ public class Company extends BaseEntity
     /** 归属代理名称 */
     @TableField(exist = false)
     private String proxyName;
+
+
+    /**
+     * 公司行业
+     */
+    private Integer companyTrade;
 }

+ 26 - 0
fs-service/src/main/java/com/fs/company/mapper/CompanyAiWorkflowExecMapper.java

@@ -98,6 +98,32 @@ public interface CompanyAiWorkflowExecMapper extends BaseMapper<CompanyAiWorkflo
 
     CompanyAiWorkflowExec selectExecWithTimeAvailableByInstanceId(@Param("workflowInstanceId") String workflowInstanceId);
 
+    /**
+     * 原子认领待执行任务:仅当当前状态等于 expectStatus 时,才将状态更新为 targetStatus。
+     * 用于防止定时扫描在上一轮任务尚未完成时重复调度同一条记录。
+     * @param id 执行记录主键
+     * @param expectStatus 期望的原状态(如 READY=9)
+     * @param targetStatus 认领后的目标状态(如 RUNNING=3)
+     * @return 影响行数,1 表示认领成功,0 表示已被其它线程/上一轮认领
+     */
+    int claimExecForRun(@Param("id") Long id,
+                        @Param("expectStatus") Integer expectStatus,
+                        @Param("targetStatus") Integer targetStatus);
+
+    /**
+     * 回扫超时认领任务:将处于 runningStatus 且 last_update_time 早于 timeoutTime 的记录重置为 failStatus,
+     * 防止进程重启 / 线程池拒绝导致任务永久卡在认领态。
+     * @param groupNo cid 分组号
+     * @param runningStatus 认领态(如 RUNNING=3)
+     * @param failStatus 重置目标态(如 FAILURE=2)
+     * @param timeoutTime 超时时间界限,last_update_time 早于该时间的视为卡死
+     * @return 影响行数
+     */
+    int resetTimeoutRunningExec(@Param("groupNo") Integer groupNo,
+                                @Param("runningStatus") Integer runningStatus,
+                                @Param("failStatus") Integer failStatus,
+                                @Param("timeoutTime") java.time.LocalDateTime timeoutTime);
+
     /**
      * 批量新增数据
      * @param list

+ 20 - 1
fs-service/src/main/resources/mapper/company/CompanyAiWorkflowExecMapper.xml

@@ -230,9 +230,28 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         FROM company_ai_workflow_exec t1
         where t1.status = #{status}
           and t1.cid_group_no = #{groupNo}
-          and NOW() BETWEEN t1.runtime_range_start and t1.runtime_range_end
+          and CURTIME() BETWEEN t1.runtime_range_start and t1.runtime_range_end
     </select>
 
+    <!-- 原子认领:仅当当前状态等于期望状态时才更新为目标状态,影响行数=1 表示认领成功 -->
+    <update id="claimExecForRun">
+        update company_ai_workflow_exec
+        set status = #{targetStatus},
+            last_update_time = NOW()
+        where id = #{id}
+          and status = #{expectStatus}
+    </update>
+
+    <!-- 回扫超时认领任务:将卡在认领态且长时间未更新的记录重置为失败态 -->
+    <update id="resetTimeoutRunningExec">
+        update company_ai_workflow_exec
+        set status = #{failStatus},
+            last_update_time = NOW()
+        where cid_group_no = #{groupNo}
+          and status = #{runningStatus}
+          and last_update_time &lt; #{timeoutTime}
+    </update>
+
     <insert id="insertBatchInfo" useGeneratedKeys="true" keyProperty="id">
         INSERT INTO company_ai_workflow_exec (
         workflow_instance_id, workflow_id, current_node_key,

+ 124 - 82
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -46,6 +46,7 @@ import com.fs.wx.sop.mapper.WxSopUserMapper;
 import com.fs.wx.sop.vo.WxSopUserMsgGenVO;
 import com.fs.wxcid.dto.friend.AddContactParam;
 import com.fs.wxcid.service.FriendService;
+import com.fs.wxcid.utils.TenantHelper;
 import com.fs.wxcid.vo.AddContactVo;
 import com.fs.wxwork.dto.WxAddSearchDTO;
 import com.fs.wxwork.dto.WxSearchContactDTO;
@@ -131,6 +132,12 @@ public class WxTaskService {
     private final QwExternalContactMapper qwExternalContactMapper;
     private final CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
 
+    /** 加微延时扫描分布式锁 key 前缀,按 租户id:分组 隔离,避免多实例/重入重复扫描同批延时key */
+    private static final String ADD_WX_DELAY_LOCK_PREFIX = "cid_workflow:addwx_delay_lock:";
+
+    /** 企微加微延时扫描分布式锁 key 前缀,按 租户id:分组 隔离 */
+    private static final String QW_ADD_WX_DELAY_LOCK_PREFIX = "cid_workflow:qwaddwx_delay_lock:";
+
     public void addWx(List<Long> accountIdList) {
         log.info("==========执行加微信任务开始==========");
         String json = sysConfigService.selectConfigByKey("wx.config");
@@ -851,53 +858,69 @@ public class WxTaskService {
      * 扫描工作流延时任务
      */
     public void cidWorkflowAddWxRun() {
-        log.info("===========工作流延时任务开始扫描===========");
-//        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
-////        Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
-//        Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
-        // 扫描新加微节点的延时Key
-        String delayAddWxNewKeyPrefix = AiAddWxTaskNewNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
-        Collection<String> keys = redisCache2.keys(delayAddWxNewKeyPrefix);
-        log.info("cidWorkflowAddWxRun共扫描到 {} 个待处理键", keys.size());
-        // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
-        Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
-        keys.parallelStream().forEach(key -> {
-            try {
-                //doExec
-                CompletableFuture.runAsync(()->{
-                    try {
-                        ExecutionContext context = redisCache2.getCacheObject(key);
-                        if (context == null) {
-                            log.warn("工作流延时任务context为空,跳过 - key: {}", key);
-                            redisCache2.deleteObject(key);
-                            return;
-                        }
-                        // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
-                        Long taskId = context.getVariable("roboticId", Long.class);
-                        if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
-                            // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
-                            // 同步context信息到DB exec,供恢复时resumePausedInstances使用
-                            context.setVariable("callSource", "addWxTimer");
-                            context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
-                            companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
-                            log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+        // 分布式锁:按 租户id:分组 隔离,拿不到锁说明已有实例在扫描,直接跳过本轮(延时key下轮仍可扫到,不丢失)
+        String lockKey = ADD_WX_DELAY_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
+        RLock methodLock = redissonClient.getLock(lockKey);
+        boolean locked = false;
+        try {
+            // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
+            locked = methodLock.tryLock(0, -1, TimeUnit.SECONDS);
+            if (!locked) {
+                log.info("cidWorkflowAddWxRun 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
+                return;
+            }
+            log.info("===========工作流延时任务开始扫描===========");
+            // 扫描新加微节点的延时Key
+            String delayAddWxNewKeyPrefix = AiAddWxTaskNewNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
+            Collection<String> keys = redisCache2.keys(delayAddWxNewKeyPrefix);
+            log.info("cidWorkflowAddWxRun共扫描到 {} 个待处理键", keys.size());
+            // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
+            Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
+            keys.parallelStream().forEach(key -> {
+                try {
+                    //doExec
+                    CompletableFuture.runAsync(()->{
+                        try {
+                            ExecutionContext context = redisCache2.getCacheObject(key);
+                            if (context == null) {
+                                log.warn("工作流延时任务context为空,跳过 - key: {}", key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
+                            Long taskId = context.getVariable("roboticId", Long.class);
+                            if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
+                                // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
+                                // 同步context信息到DB exec,供恢复时resumePausedInstances使用
+                                context.setVariable("callSource", "addWxTimer");
+                                context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
+                                companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
+                                log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            context.setVariable("callRedisKey",key);
+                            context.setVariable("callSource","addWxTimer");
+                            companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
                             redisCache2.deleteObject(key);
-                            return;
+                        } catch (Exception e) {
+                            log.error("处理工作流延时任务异常 - key: {}", key, e);
                         }
-                        context.setVariable("callRedisKey",key);
-                        context.setVariable("callSource","addWxTimer");
-                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
-                        redisCache2.deleteObject(key);
-                    } catch (Exception e) {
-                        log.error("处理工作流延时任务异常 - key: {}", key, e);
-                    }
-                }, cidWorkFlowExecutor);
+                    }, cidWorkFlowExecutor);
 
-            } catch (Exception ex) {
-                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                } catch (Exception ex) {
+                    log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                }
+            });
+            log.info("===========工作流延时任务扫描结束===========");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("cidWorkflowAddWxRun 获取分布式锁被中断 - lockKey: {}", lockKey, e);
+        } finally {
+            if (locked && methodLock.isHeldByCurrentThread()) {
+                methodLock.unlock();
             }
-        });
-        log.info("===========工作流延时任务扫描结束===========");
+        }
     }
 
     /**
@@ -1677,49 +1700,68 @@ public class WxTaskService {
      * 扫描企微加微工作流延时任务
      */
     public void cidWorkflowQwAddWxRun() {
-        log.info("===========企微加微工作流延时任务开始扫描===========");
-        String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
-        Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
-        log.info("企微加微共扫描到 {} 个待处理键", keys.size());
-        // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
-        Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
-        keys.parallelStream().forEach(key -> {
-            try {
-                //doExec
-                CompletableFuture.runAsync(()->{
-                    try {
-                        ExecutionContext context = redisCache2.getCacheObject(key);
-                        if (context == null) {
-                            log.warn("企微加微工作流延时任务context为空,跳过 - key: {}", key);
-                            redisCache2.deleteObject(key);
-                            return;
-                        }
-                        // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
-                        Long taskId = context.getVariable("roboticId", Long.class);
-                        if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
-                            // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
-                            // 同步context信息到DB exec,供恢复时resumePausedInstances使用
-                            context.setVariable("callSource", "qwAddWxTimer");
-                            context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
-                            companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
-                            log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+        // 分布式锁:按 租户id:分组 隔离,拿不到锁说明已有实例在扫描,直接跳过本轮(延时key下轮仍可扫到,不丢失)
+        String lockKey = QW_ADD_WX_DELAY_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
+        RLock methodLock = redissonClient.getLock(lockKey);
+        boolean locked = false;
+        try {
+            // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
+            locked = methodLock.tryLock(0, -1, TimeUnit.SECONDS);
+            if (!locked) {
+                log.info("cidWorkflowQwAddWxRun 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
+                return;
+            }
+            log.info("===========企微加微工作流延时任务开始扫描===========");
+            String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
+            Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
+            log.info("企微加微共扫描到 {} 个待处理键", keys.size());
+            // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
+            Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
+            keys.parallelStream().forEach(key -> {
+                try {
+                    //doExec
+                    CompletableFuture.runAsync(()->{
+                        try {
+                            ExecutionContext context = redisCache2.getCacheObject(key);
+                            if (context == null) {
+                                log.warn("企微加微工作流延时任务context为空,跳过 - key: {}", key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
+                            Long taskId = context.getVariable("roboticId", Long.class);
+                            if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
+                                // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
+                                // 同步context信息到DB exec,供恢复时resumePausedInstances使用
+                                context.setVariable("callSource", "qwAddWxTimer");
+                                context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
+                                companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
+                                log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
+                                redisCache2.deleteObject(key);
+                                return;
+                            }
+                            context.setVariable("callRedisKey",key);
+                            context.setVariable("callSource","qwAddWxTimer");
+                            companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
                             redisCache2.deleteObject(key);
-                            return;
+                        } catch (Exception e) {
+                            log.error("处理工作流延时任务异常 - key: {}", key, e);
                         }
-                        context.setVariable("callRedisKey",key);
-                        context.setVariable("callSource","qwAddWxTimer");
-                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
-                        redisCache2.deleteObject(key);
-                    } catch (Exception e) {
-                        log.error("处理工作流延时任务异常 - key: {}", key, e);
-                    }
-                }, cidWorkFlowExecutor);
+                    }, cidWorkFlowExecutor);
 
-            } catch (Exception ex) {
-                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                } catch (Exception ex) {
+                    log.error("处理工作流延时任务异常 - key: {}", key, ex);
+                }
+            });
+            log.info("===========工作流延时任务扫描结束===========");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("cidWorkflowQwAddWxRun 获取分布式锁被中断 - lockKey: {}", lockKey, e);
+        } finally {
+            if (locked && methodLock.isHeldByCurrentThread()) {
+                methodLock.unlock();
             }
-        });
-        log.info("===========工作流延时任务扫描结束===========");
+        }
     }