lmx 3 tygodni temu
rodzic
commit
31ff426c9d

+ 8 - 0
fs-service/src/main/java/com/fs/company/service/CompanyWorkflowEngine.java

@@ -54,4 +54,12 @@ public interface CompanyWorkflowEngine {
      * @return 恢复结果
      */
     void resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData);
+
+    /**
+     * 定时器调用节点执行
+     * @param workflowInstanceId
+     * @param nodeKey
+     * @param inputData
+     */
+    void timeDoExecute(String workflowInstanceId, String nodeKey, Map<String, Object> inputData);
 }

+ 2 - 0
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -330,6 +330,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             userDataJson.put("nodeKey",context.getCurrentNodeKey());
             userDataJson.put("workflowInstanceId",context.getWorkflowInstanceId());
             userDataJson.put("callerId",callerId);
+            context.setVariable("callBackUuid",callBackUuid);
             redisCache2.setCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + callBackUuid, userDataJson.toJSONString());
             CalleeDomain build = CalleeDomain.builder().number(callees.getPhone()).userData(callBackUuid).build();
             List<CalleeDomain> mobileList = new ArrayList<>();
@@ -644,6 +645,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             if(null!= userData && userData.containsKey("callBackUuid") && userData.containsKey("workflowInstanceId") && userData.containsKey("nodeKey")){
                 Map<String, Object> param = new HashMap<>();
                 param.put("callBackUuid",userData.getString("callBackUuid"));
+                param.put("callSource","callBack");
                 CompletableFuture.runAsync(()->{
                     companyWorkflowEngine.resumeFromBlockingNode(userData.getString("workflowInstanceId"),userData.getString("nodeKey"),param);
                 },cidWorkFlowExecutor);

+ 71 - 16
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -329,7 +329,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
      * 创建节点实例
      */
     private IWorkflowNode createNode(CompanyWorkflow definition, String nodeKey) {
-        CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByWorkflowIdAndNodeKey(definition.getWorkflowId(),nodeKey);
+        CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByWorkflowIdAndNodeKey(definition.getWorkflowId(), nodeKey);
         if (node == null) {
             return null;
         }
@@ -358,7 +358,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
         context.setVariables(inputVariables != null ? inputVariables : new HashMap<>());
         context.setStartTime(LocalDateTime.now());
         context.setCurrentTime(LocalDateTime.now());
-        context.setBusinessId(inputVariables.containsKey("businessId")?inputVariables.get("businessId").toString():null);
+        context.setBusinessId(inputVariables.containsKey("businessId") ? inputVariables.get("businessId").toString() : null);
 
         return context;
     }
@@ -393,7 +393,8 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
         // 实现根据ID加载工作流定义的逻辑
         return companyWorkflowMapper.selectCompanyWorkflowById(id);
     }
-//    /**
+
+    //    /**
 //     * 将工作流设置为阻塞状态
 //     */
 //    private void pauseWorkflowForBlockingNode(String workflowInstanceId, String nodeKey, ExecutionContext context) {
@@ -411,7 +412,6 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 //        }
 //    }
     @Override
-    @Transactional
     public void resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData) {
         try {
             // 加载当前执行记录
@@ -460,22 +460,77 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 //            updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.RUNNING);
 
             // 根据执行结果决定下一步操作
-            if (result.isSuccess()) {
-                if (StringUtils.isBlank(result.getNextNodeKey())) {
-                    // 如果没有下一个节点,完成工作流
-                    completeWorkflow(workflowInstanceId);
-                } else {
-                    // 执行下一个节点
-                    executeNode(workflowInstanceId, result.getNextNodeKey());
-                }
-            } else {
-                // 执行失败,更新状态
-                updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-            }
+//            if (result.isSuccess()) {
+//                if (StringUtils.isBlank(result.getNextNodeKey())) {
+//                    // 如果没有下一个节点,完成工作流
+//                    completeWorkflow(workflowInstanceId);
+//                } else {
+//                    // 执行下一个节点
+//                    executeNode(workflowInstanceId, result.getNextNodeKey());
+//                }
+//            } else {
+//                // 执行失败,更新状态
+//                updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
+//            }
         } catch (Exception e) {
             log.error("唤醒阻塞节点失败: {} -> {}", workflowInstanceId, nodeKey, e);
             updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
             throw new CustomException("Resume from blocking node failed: " + e.getMessage());
         }
     }
+
+    /**
+     * 定时任务 唤醒执行工作流节点
+     *
+     * @param workflowInstanceId
+     * @param nodeKey
+     * @param inputData
+     */
+    @Override
+    public void timeDoExecute(String workflowInstanceId, String nodeKey, Map<String, Object> inputData) {
+        try {
+            // 加载当前执行记录
+            CompanyAiWorkflowExec currentExec = currentExecutionMapper.selectByWorkflowInstanceId(workflowInstanceId);
+
+            if (currentExec == null) {
+                throw new CustomException("工作流实例不存在: " + workflowInstanceId);
+            }
+            String execNode = currentExec.getCurrentNodeKey();
+
+            if(null == inputData || !inputData.containsKey("lastNodeKey")){
+                throw new CustomException("节点流程匹配有误: " + inputData);
+            }
+            String lastNodeKey =(String) inputData.get("lastNodeKey");
+
+            // 验证当前节点是否匹配
+            if (!lastNodeKey.equals(execNode)) {
+                log.error("当前流程已扭转节点不匹配 - 期望: {}, 实际: {}", lastNodeKey, execNode);
+                throw new CustomException("节点不匹配,期望: " + lastNodeKey + ", 实际: " + execNode);
+            }
+
+            // 上个节点阻塞校验
+            if (!Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) {
+                throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
+            }
+            // 反序列化执行上下文并合并新的输入数据
+            ExecutionContext context = deserializeContext(currentExec);
+            if (inputData != null) {
+                context.getVariables().putAll(inputData);
+            }
+            // 加载工作流定义
+            CompanyWorkflow definition = loadCompanyWorkflow(currentExec.getWorkflowId());
+
+            // 创建节点实例
+            IWorkflowNode node = createNode(definition, nodeKey);
+            if (node == null) {
+                throw new CustomException("节点不存在: " + nodeKey);
+            }
+
+            node.execute(context);
+
+        } catch (Exception e){
+
+        }
+    }
+
 }

+ 1 - 0
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -108,6 +108,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
      */
     protected void preExecute(ExecutionContext context) {
         context.setVariable("node_start_time_" + nodeKey, System.currentTimeMillis());
+        context.setVariable("lastNodeKey",nodeKey);
         log.info("Starting execution of node: {} ({})", nodeKey, nodeName);
     }
 

+ 28 - 8
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -16,6 +16,7 @@ import com.fs.company.vo.ExecutionResult;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
 
+import java.time.LocalDateTime;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,8 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     private static final ICompanyVoiceRoboticService companyVoiceRoboticService = SpringUtils.getBean(ICompanyVoiceRoboticService.class);
     public static final String DELAY_CALL_KEY = "aiCallTask:delay:%s:%s:";
+    private final String CALL_FROM_CALLBACK = "callBack";
+    private final String CALL_FROM_TIMER = "timer";
 
     public AiCallTaskNode(String nodeKey, String nodeName, Map<String, Object> properties) {
         super(nodeKey, nodeName, properties);
@@ -53,14 +56,14 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
             //暂时考虑单条件
             AiCallWorkflowConditionVo condition = conditions.get(0);
             //拨通
-            if (condition.isCallConnected() && callRes.getCallTime()!=null && callRes.getCallTime() > 0) {
+            if (condition.isCallConnected() && callRes.getCallTime() != null && callRes.getCallTime() > 0) {
                 //如果含有意向度过滤
-                if(null != condition.getIntention()){
-                    if( condition.getIntention().equals(callRes.getIntention())){
-                        super.runNextNode(context,edge);
+                if (null != condition.getIntention()) {
+                    if (condition.getIntention().equals(callRes.getIntention())) {
+                        super.runNextNode(context, edge);
                     }
                 } else {
-                    super.runNextNode(context,edge);
+                    super.runNextNode(context, edge);
                 }
             }
             //未拨通
@@ -68,14 +71,16 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                 //延时操作
                 if (null != condition.getCallTime()) {
                     //计算延时分片分钟
-                    Date nowDay = new Date(System.currentTimeMillis() + condition.getCallTime() * 60 * 1000);
+                    long l = System.currentTimeMillis() + condition.getCallTime() * 60 * 1000;
+                    ExecutionContext nextContext = context.clone();
+                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                     //添加到延时扫描redis
-                    super.redisCache.setCacheObject(String.format(DELAY_CALL_KEY, nowDay.getHours(), nowDay.getMinutes()) + exec.getWorkflowInstanceId(), context);
+                    super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext);
                     super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
                 }
                 //无时间驱动
                 else {
-                    super.runNextNode(context,edge);
+                    super.runNextNode(context, edge);
                 }
             }
 
@@ -95,6 +100,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
             companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context, callConfigVo);
             super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
             return ExecutionResult.paused()
+                    .outputData(context.getVariables())
                     .nextNodeKey("").build();
         } else {
             return ExecutionResult.failure()
@@ -113,6 +119,20 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
         return true;
     }
 
+    /**
+     * getRedisCacheKey
+     *
+     * @param time
+     * @return
+     */
+    public static String getDelayCallKeyPrefix(Long time) {
+        Date nowDay = new Date();
+        if (null != time) {
+            nowDay = new Date(time);
+        }
+        return String.format(DELAY_CALL_KEY, nowDay.getHours(), nowDay.getMinutes());
+    }
+
 
 //    @Override
 //    public String getNextNodeKey(String workflowInstanceId, String nodeKey) {

+ 5 - 0
fs-wx-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -50,6 +50,11 @@ public class CommonController {
     public void callNextTask(){
         taskService.callNextTask();
     }
+    @GetMapping("cidWorkflowCallRun")
+    public void cidWorkflowCallRun(){
+        taskService.cidWorkflowCallRun();
+    }
+
 
 
 }

+ 105 - 68
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -13,6 +13,7 @@ import com.fs.common.utils.PubFun;
 import com.fs.common.utils.StringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.*;
+import com.fs.company.param.ExecutionContext;
 import com.fs.company.service.ICompanyVoiceRoboticService;
 import com.fs.company.service.ICompanyWxAccountService;
 import com.fs.company.service.ICompanyWxClientService;
@@ -20,6 +21,8 @@ import com.fs.company.service.ICompanyWxDialogService;
 import com.fs.company.service.impl.*;
 import com.fs.company.service.CompanyWorkflowEngine;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
+import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.course.config.RedisKeyScanner;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
 import com.fs.company.util.ObjectPlaceholderResolver;
@@ -81,6 +84,7 @@ public class WxTaskService {
             r -> new Thread(r, "callPool-" + System.currentTimeMillis()),
             new ThreadPoolExecutor.CallerRunsPolicy()
     );
+    private final RedisKeyScanner redisKeyScanner;
 
     public void addWx(List<Long> accountIdList) {
         log.info("==========执行加微信任务开始==========");
@@ -97,7 +101,7 @@ public class WxTaskService {
                 .map(callee -> callee.getRoboticId() + "_" + callee.getUserId())
                 .collect(Collectors.toSet());
 
-         list = list.stream()
+        list = list.stream()
                 .filter(client -> {
                     String key = client.getRoboticId() + "_" + client.getCustomerId();
                     return !existingKeys.contains(key);
@@ -105,7 +109,7 @@ public class WxTaskService {
                 .collect(Collectors.toList());
 
         log.info("需要添加微信的数量:{}", list.size());
-        if(list.isEmpty()) return;
+        if (list.isEmpty()) return;
         List<CompanyWxClient> addList = new ArrayList<>();
         Map<Long, CompanyWxClient> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient::getAccountId);
         List<CompanyWxAccount> accountList = new ArrayList<>(companyWxAccountService.listByIds(clientMap.keySet()));
@@ -128,23 +132,23 @@ public class WxTaskService {
         log.info("实际加微的账号数量:{}", addAccountList.size());
         addAccountList.forEach(e -> {
             CompanyWxClient client = clientMap.get(e.getId());
-            if(client != null){
+            if (client != null) {
                 String task = redisCache.getCacheObject(Constants.TASK_ID + client.getRoboticId());
                 log.info("ROBOTIC-ID:{},CLIENT-ID:{},当前任务执行状态:{}", client.getRoboticId(), client.getId(), task);
-                if(StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)){
+                if (StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)) {
                     CompanyWxDialog dialog = companyWxDialogService.getById(client.getDialogId());
                     CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
                     String newTxt = objectPlaceholderResolver.resolvePlaceholders(crmCustomer, dialog.getTemplateDetails());
                     AddContactVo vo = friendService.addContact(e.getId(), crmCustomer.getMobile(), newTxt, client.getId());
                     JSONObject runParam = new JSONObject();
-                    runParam.put("id",e.getId());
-                    runParam.put("mobile",crmCustomer.getMobile());
-                    runParam.put("txt",newTxt);
-                    runParam.put("clientId",client.getId());
+                    runParam.put("id", e.getId());
+                    runParam.put("mobile", crmCustomer.getMobile());
+                    runParam.put("txt", newTxt);
+                    runParam.put("clientId", client.getId());
                     CompanyVoiceRoboticCallLogAddwx addLog = CompanyVoiceRoboticCallLogAddwx.initCallLog(
-                            runParam.toJSONString(),client.getId(),client.getRoboticId(),e.getId(),e.getCompanyId());
+                            runParam.toJSONString(), client.getId(), client.getRoboticId(), e.getId(), e.getCompanyId());
                     log.info("ROBOTIC-ID:{},CLIENT-ID:{},执行加微:{},客户:{}-{},使用话术:{}", client.getRoboticId(), client.getId(), e.getId(), client.getCustomerId(), crmCustomer.getCustomerName(), dialog.getName());
-                    if(vo.isSuccess()){
+                    if (vo.isSuccess()) {
                         e.setLastAddWxTime(LocalDateTime.now());
                         e.setIsAddNum(e.getIsAddNum() + 1);
                         client.setIsAdd(2);
@@ -154,20 +158,20 @@ public class WxTaskService {
                         addList.add(client);
                         addLog.setStatus(2);
                         addLog.setResult(JSON.toJSONString(vo));
-                    }else{
+                    } else {
                         log.error("ROBOTIC-ID:{},加微失败:{}", client.getRoboticId(), vo);
                         addLog.setStatus(3);
                         addLog.setResult(JSON.toJSONString(vo));
                     }
                     asyncSaveCompanyVoiceRoboticCallLog(addLog);
-                }else{
+                } else {
                     log.error("ROBOTIC-ID:{},当前任务没有执行加微任务", client.getRoboticId());
                 }
-            }else{
+            } else {
                 log.error("当前账号暂无需要添加微信:{}-{}", e.getId(), e.getWxNickName());
             }
         });
-        if(!addList.isEmpty()){
+        if (!addList.isEmpty()) {
             companyWxClientService.updateBatchById(addList);
             //根据加微成功的用户,判定是否加入延时执行下一步任务
             Set<Long> roboticIdSet = addList.stream().map(CompanyWxClient::getRoboticId).collect(Collectors.toSet());
@@ -177,27 +181,27 @@ public class WxTaskService {
             List<CompanyVoiceRobotic> companyVoiceRobotics = companyVoiceRoboticMapper.selectBatchIds(roboticIdSet);
             Map<Long, CompanyVoiceRobotic> roboticsMp = companyVoiceRobotics.stream().collect(Collectors.toMap(CompanyVoiceRobotic::getId, Function.identity(), (existing, replacement) -> existing));
             //找到callees数据
-            List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet,roboticIdSet);
-            Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e->e.getUserId()+ "-" +e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
+            List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet, roboticIdSet);
+            Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e -> e.getUserId() + "-" + e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
 
             long l = System.currentTimeMillis();
 
             //根据加微成功
             for (CompanyWxClient client : addList) {
                 CompanyVoiceRobotic clientRobotic = roboticsMp.getOrDefault(client.getRoboticId(), null);
-                if(null ==  clientRobotic){
+                if (null == clientRobotic) {
                     log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
                     continue;
                 }
                 CompanyVoiceRoboticCallees callees = calleesMp.getOrDefault(client.getCustomerId() + "-" + client.getRoboticId(), null);
-                if(null ==  callees){
+                if (null == callees) {
                     log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
                     continue;
                 }
                 Integer addWxTime = clientRobotic.getAddWxTime();
-                if(null == addWxTime){
+                if (null == addWxTime) {
                     log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有设置加微后置等待时间", client.getRoboticId(), client.getId());
-                }else{
+                } else {
                     long endT = System.currentTimeMillis() + addWxTime * 60 * 1000;
                     StringBuilder sb = new StringBuilder(Constants.CID_NEXT_TASK_ID).append(callees.getRoboticId()).append(":").append(callees.getId());
                     redisCache.setCacheObject(sb.toString(), String.valueOf(endT));
@@ -206,13 +210,13 @@ public class WxTaskService {
             companyVoiceRoboticCallees.forEach(robotic ->
                     robotic.setRunTaskFlow(
                             StringUtils.isBlank(robotic.getRunTaskFlow()) ?
-                                    Constants.ADD_WX: robotic.getRunTaskFlow() + "," +  Constants.ADD_WX
+                                    Constants.ADD_WX : robotic.getRunTaskFlow() + "," + Constants.ADD_WX
                     )
             );
             companyVoiceRoboticCalleesServiceImpl.updateBatchById(companyVoiceRoboticCallees);
             companyVoiceRoboticServiceImpl.finishAddWxByCallees(roboticIdSet);
         }
-        if(!addAccountList.isEmpty()){
+        if (!addAccountList.isEmpty()) {
             companyWxAccountService.updateBatchById(addAccountList);
         }
 
@@ -224,15 +228,15 @@ public class WxTaskService {
         WxConfig config = JSONUtil.toBean(json, WxConfig.class);
         List<CompanyWxAccount> list = companyWxAccountService.list();
         list.forEach(e -> {
-            if(e.getAccountCreateTime() != null){
+            if (e.getAccountCreateTime() != null) {
                 long until = e.getAccountCreateTime().until(now.toLocalDate(), ChronoUnit.DAYS);
-                if(until > config.getNewAccountTime()){
+                if (until > config.getNewAccountTime()) {
                     e.setIsNew(1);
                 }
             }
-            if(e.getIsNew() == 0){
+            if (e.getIsNew() == 0) {
                 e.setAddNum(config.getNewAccountAddNum());
-            }else{
+            } else {
                 e.setAddNum(RandomUtil.randomInt(config.getAccountAddMax(), config.getAccountAddMin()));
             }
             e.setIsAddNum(0);
@@ -260,7 +264,7 @@ public class WxTaskService {
         List<CompanyVoiceRobotic> successList = list.stream().filter(e -> StringUtils.isNotEmpty(e.getRunTaskFlow()) && e.getTaskFlow().length() == e.getRunTaskFlow().length()).collect(Collectors.toList());
         List<CompanyVoiceRobotic> waitList = list.stream().filter(e -> StringUtils.isEmpty(e.getRunTaskFlow()) || e.getTaskFlow().length() != e.getRunTaskFlow().length()).collect(Collectors.toList());
         successList.forEach(e -> e.setTaskStatus(3));
-        if(!successList.isEmpty()){
+        if (!successList.isEmpty()) {
             log.info("已经完成任务:{}", successList.size());
             companyVoiceRoboticService.updateBatchById(successList);
         }
@@ -270,7 +274,7 @@ public class WxTaskService {
             log.info("ROBOTIC-ID:{},当前任务顺序:{}", e.getId(), e.getTaskFlow());
             String runTaskFlow = e.getRunTaskFlow();
             log.info("ROBOTIC-ID:{},已有任务:{}", e.getId(), e.getRunTaskFlow());
-            if(StringUtils.isNotEmpty(runTaskFlow)){
+            if (StringUtils.isNotEmpty(runTaskFlow)) {
                 Arrays.asList(runTaskFlow.split(",")).forEach(taskFlow::remove);
             }
             log.info("ROBOTIC-ID:{},当前还剩余任务:{}", e.getId(), taskFlow);
@@ -279,7 +283,7 @@ public class WxTaskService {
         });
         Function<CompanyVoiceRobotic, String> getKey = e -> Constants.TASK_ID + e.getId();
         waitList.forEach(e -> {
-            if(redisCache.getCacheObject(getKey.apply(e)) != null){
+            if (redisCache.getCacheObject(getKey.apply(e)) != null) {
                 log.info("ROBOTIC-ID:{},已有正在执行任务", e.getId());
                 return;
             }
@@ -292,7 +296,7 @@ public class WxTaskService {
                         break;
                     case Constants.ADD_WX:
                         //第一步是调用添加微信步骤
-                        if(StringUtils.isBlank(e.getRunTaskFlow()) && StringUtils.isNotBlank(e.getTaskFlow()) && e.getTaskFlow().startsWith(Constants.ADD_WX)){
+                        if (StringUtils.isBlank(e.getRunTaskFlow()) && StringUtils.isNotBlank(e.getTaskFlow()) && e.getTaskFlow().startsWith(Constants.ADD_WX)) {
                             companyVoiceRoboticServiceImpl.allocateWx(e);
 //                            CompletableFuture.supplyAsync(()->{
 //                                //分配个微账号
@@ -330,7 +334,7 @@ public class WxTaskService {
 //                                log.error("ROBOTIC-ID:{},任务执行异常:{}", e.getId(), e.getNowTask(), ex);
 //                                return null;
 //                            });
-                        }else{
+                        } else {
                             //todo 接入原有加微逻辑
                         }
                         break;
@@ -338,7 +342,7 @@ public class WxTaskService {
 
                         break;
                 }
-            }catch (Exception exception){
+            } catch (Exception exception) {
                 log.error("ROBOTIC-ID:{},任务执行失败:{}", e.getId(), e.getNowTask(), exception);
                 redisCache.deleteObject(getKey.apply(e));
             }
@@ -403,31 +407,34 @@ public class WxTaskService {
 
     /**
      * 单任务加微
+     *
      * @param roboticId
      * @param callerId
      * @return
      */
-    private  Boolean addWxOne(Long roboticId,Long callerId){
+    private Boolean addWxOne(Long roboticId, Long callerId) {
 
         return Boolean.TRUE;
     }
+
     /**
      * 单个任务执行且为单条执行对象
+     *
      * @param roboticId
      * @param callerId
      */
-    public String cellRunOne(Long roboticId,Long callerId){
+    public String cellRunOne(Long roboticId, Long callerId) {
 
         //查询任务执行情况
         CompanyVoiceRoboticCallees data = companyVoiceRoboticCalleesMapper.selectDataByCalleesId(callerId);
         CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectCompanyVoiceRoboticById(roboticId);
         String taskFlow = data.getTaskFlow();
-        if( null == data || null == robotic ){
-            log.error("没有查询到任务执行数据,roboticId:{},callerId:{}",roboticId,callerId);
+        if (null == data || null == robotic) {
+            log.error("没有查询到任务执行数据,roboticId:{},callerId:{}", roboticId, callerId);
             return null;
         }
-        if(Integer.valueOf(3).equals(robotic.getTaskStatus())){
-            log.error("执行任务已经完成了,roboticId:{}",roboticId);
+        if (Integer.valueOf(3).equals(robotic.getTaskStatus())) {
+            log.error("执行任务已经完成了,roboticId:{}", roboticId);
             return null;
         }
         String nextTask;
@@ -440,14 +447,14 @@ public class WxTaskService {
             data.setRunTaskFlow(robotic.getRunTaskFlow());
 //            return null;
         }
-        if(StringUtils.isBlank(nextTask)){
-            log.error("任务没有下个执行任务,标记完成,roboticId:{}",roboticId);
+        if (StringUtils.isBlank(nextTask)) {
+            log.error("任务没有下个执行任务,标记完成,roboticId:{}", roboticId);
             companyVoiceRoboticMapper.finishRobotic(roboticId);
             return null;
         }
-        log.info("单人单任务执行ROBOTIC-ID:{},caller_id:{},当前需要执行任务:{}", roboticId,callerId, nextTask);
+        log.info("单人单任务执行ROBOTIC-ID:{},caller_id:{},当前需要执行任务:{}", roboticId, callerId, nextTask);
         String nextTaskOptimized = null;
-        try{
+        try {
             switch (nextTask) {
                 case Constants.CELL_PHONE:
                     companyVoiceRoboticService.callPhoneOne(roboticId, callerId);
@@ -457,13 +464,13 @@ public class WxTaskService {
                     Boolean success = addWxOne(roboticId, callerId);
                     break;
                 case Constants.SEND_MSG:
-                    if(Integer.valueOf(0).equals(data.getIsSendMsg())){
+                    if (Integer.valueOf(0).equals(data.getIsSendMsg())) {
                         //发送短信前一个任务如果是打电话 等待电话打完以后再执行发送
                         String lastTaskOptimized = getLastTaskOptimized(taskFlow);
-                        if(Constants.CELL_PHONE.equals(lastTaskOptimized)){
+                        if (Constants.CELL_PHONE.equals(lastTaskOptimized)) {
                             //是否打电话结束有回调值 完成电话动作以后执行下一步
                             CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLogCallphone = companyVoiceRoboticCallLogCallphoneService.selectLogByRoboticIdAndCallerId(roboticId, callerId);
-                            if(null != companyVoiceRoboticCallLogCallphone && companyVoiceRoboticCallLogCallphone.getStatus() == 1){
+                            if (null != companyVoiceRoboticCallLogCallphone && companyVoiceRoboticCallLogCallphone.getStatus() == 1) {
                                 nextTaskOptimized = "wait callPhone";
                                 break;
                             }
@@ -476,20 +483,20 @@ public class WxTaskService {
                         companyVoiceRoboticService.sendMsgOne(roboticId, callerId);
                         nextTaskOptimized = getNextTaskOptimized(taskFlow, data.getRunTaskFlow() + "," + Constants.SEND_MSG);
                         break;
-                    } else{
-                        log.info("不再需要发送短信处理,roboticId:{},callerId:{}",roboticId,callerId);
+                    } else {
+                        log.info("不再需要发送短信处理,roboticId:{},callerId:{}", roboticId, callerId);
                         nextTaskOptimized = getNextTaskOptimized(taskFlow, data.getRunTaskFlow() + "," + Constants.SEND_MSG);
                         break;
                     }
             }
-        } catch (Exception ex){
-            log.error("执行任务异常,roboticId:{},callerId:{},nextTask:{}",roboticId,callerId,nextTask,ex);
+        } catch (Exception ex) {
+            log.error("执行任务异常,roboticId:{},callerId:{},nextTask:{}", roboticId, callerId, nextTask, ex);
             nextTaskOptimized = "exception";
         }
 
-        if(StringUtils.isNotBlank(nextTaskOptimized)){
+        if (StringUtils.isNotBlank(nextTaskOptimized)) {
             return nextTaskOptimized;
-        }else{
+        } else {
             //任务执行完了 没有下一步 直接完成任务
 //            companyVoiceRoboticMapper.finishRobotic(roboticId);
             return null;
@@ -499,6 +506,7 @@ public class WxTaskService {
 
     /**
      * 获取下一个任务
+     *
      * @param taskFlow
      * @param runTaskFlow
      * @return
@@ -523,20 +531,21 @@ public class WxTaskService {
 
     /**
      * 获取最后一个执行的任务
+     *
      * @param taskFlow
      * @return
      */
-    public String getLastTaskOptimized(String taskFlow){
+    public String getLastTaskOptimized(String taskFlow) {
         return taskFlow.substring(taskFlow.lastIndexOf(",") + 1);
     }
 
     /**
      * 调用下一个任务
      */
-    public void callNextTask(){
+    public void callNextTask() {
         //
         RLock lock = redissonClient.getLock("CID_CALL_NEXT_TASK");
-        try{
+        try {
             lock.lock();
             log.info("===========CID扫描执行下一个任务任务执行开始===========");
             long l = System.currentTimeMillis();
@@ -545,33 +554,33 @@ public class WxTaskService {
                 String[] keyArr = key.split(":");
                 String taskId = keyArr[keyArr.length - 2];
                 String callerId = keyArr[keyArr.length - 1];
-                Long runTime =Long.valueOf(redisCache.getCacheObject(key));
-                log.info("任务执行时间:{},当前时间:{}",runTime,l);
+                Long runTime = Long.valueOf(redisCache.getCacheObject(key));
+                log.info("任务执行时间:{},当前时间:{}", runTime, l);
                 //到了该执行时间
-                if(runTime.compareTo(l) <= 0){
-                    log.info("开始执行任务:{},callerId:{}",taskId,callerId);
+                if (runTime.compareTo(l) <= 0) {
+                    log.info("开始执行任务:{},callerId:{}", taskId, callerId);
                     //得到待执行任务
-                    CompletableFuture.supplyAsync(() -> this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId)),cidExcutor).thenApply(res -> {
-                        if(StringUtils.isBlank(res)){
+                    CompletableFuture.supplyAsync(() -> this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId)), cidExcutor).thenApply(res -> {
+                        if (StringUtils.isBlank(res)) {
                             redisCache.deleteObject(key);
                             redisCache.deleteObject(Constants.TASK_ID + taskId);
                         }
                         return null;
                     }).exceptionally(throwable -> {
-                        log.error("单项任务执行或删除失败,taskId: {},callerId:{}", taskId, callerId,throwable);
+                        log.error("单项任务执行或删除失败,taskId: {},callerId:{}", taskId, callerId, throwable);
                         return null;
                     });
-                }else{
+                } else {
                     // todo 加入新逻辑 没有到执行时间的待执行任务 检查上一个任务的执行状态
                     // 如果是已经完成的状态 修改待执行时间为现在 下次进入任务会直接执行对应的下个任务
 
                 }
             });
 
-        }catch (Exception ex){
+        } catch (Exception ex) {
             log.error("CID任务自动调用调用下一个任务失败", ex);
         } finally {
-            if(lock.isHeldByCurrentThread()){
+            if (lock.isHeldByCurrentThread()) {
                 lock.unlock();
             }
         }
@@ -582,15 +591,17 @@ public class WxTaskService {
 
     @Autowired
     CompanyVoiceRoboticCallLogAddwxServiceImpl companyVoiceRoboticCallLogAddwxService;
+
     /**
      * 记录任务执行日志 addWx
+     *
      * @param logAddwx
      */
-    public void asyncSaveCompanyVoiceRoboticCallLog(CompanyVoiceRoboticCallLogAddwx logAddwx){
-        try{
+    public void asyncSaveCompanyVoiceRoboticCallLog(CompanyVoiceRoboticCallLogAddwx logAddwx) {
+        try {
             companyVoiceRoboticCallLogAddwxService.asyncInsertCompanyVoiceRoboticCallLog(logAddwx);
-        }catch (Exception ex){
-            log.error("记录任务执行日志失败:失败数据:{}",logAddwx, ex);
+        } catch (Exception ex) {
+            log.error("记录任务执行日志失败:失败数据:{}", logAddwx, ex);
         }
     }
 
@@ -694,4 +705,30 @@ public class WxTaskService {
             }
         }
     }
+
+    /**
+     * 扫描工作流延时任务
+     */
+    public void cidWorkflowCallRun() {
+        log.info("===========工作流延时任务开始扫描===========");
+        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
+        Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
+        log.info("共扫描到 {} 个待处理键", keys.size());
+        HashMap commonMp = new HashMap();
+        commonMp.put("callSource","timer");
+        keys.parallelStream().forEach(key -> {
+            try {
+                //doExec
+                CompletableFuture.runAsync(()->{
+                    String cacheObject = redisCache.getCacheObject(key);
+                    ExecutionContext context = JSONObject.parseObject(cacheObject, ExecutionContext.class);
+                    companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                }, cidExcutor);
+
+            } catch (Exception ex) {
+                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+            }
+        });
+        log.info("===========工作流延时任务扫描结束===========");
+    }
 }

+ 5 - 0
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -50,4 +50,9 @@ public class WxTask {
     public void checkWorkflowAddWxTimeout(){
         taskService.checkWorkflowAddWxTimeout();
     }
+
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void cidWorkflowRun(){
+        taskService.cidWorkflowCallRun();
+    }
 }