Преглед изворни кода

Merge remote-tracking branch 'origin/master-ai-cell' into master-ai-cell

zyy пре 2 недеља
родитељ
комит
712d2706d0
23 измењених фајлова са 496 додато и 252 уклоњено
  1. 1 1
      fs-ai-call-task/src/main/java/com/fs/FsAiCallTaskApplication.java
  2. 4 6
      fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java
  3. 2 1
      fs-service/src/main/java/com/fs/company/mapper/CompanyWxClientMapper.java
  4. 2 1
      fs-service/src/main/java/com/fs/company/service/ICompanyWxClientService.java
  5. 55 51
      fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticCallLogCallphoneServiceImpl.java
  6. 16 7
      fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java
  7. 6 2
      fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java
  8. 4 0
      fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowServiceImpl.java
  9. 7 1
      fs-service/src/main/java/com/fs/company/service/impl/CompanyWxClientServiceImpl.java
  10. 15 2
      fs-service/src/main/java/com/fs/company/service/impl/CompanyWxServiceImpl.java
  11. 45 15
      fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java
  12. 120 72
      fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java
  13. 33 28
      fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java
  14. 1 0
      fs-service/src/main/java/com/fs/company/service/impl/call/node/AiSendMsgTaskNode.java
  15. 10 0
      fs-service/src/main/java/com/fs/company/service/impl/call/node/EndNode.java
  16. 85 0
      fs-service/src/main/java/com/fs/company/vo/CompanyWxClient4WorkFlowVO.java
  17. 4 0
      fs-service/src/main/resources/mapper/company/CompanyWorkflowMapper.xml
  18. 2 2
      fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml
  19. 1 1
      fs-user-app/src/main/java/com/fs/framework/aspectj/lock/DistributeLockAspect.java
  20. 1 1
      fs-wx-task/src/main/java/com/fs/FsWxTaskApplication.java
  21. 10 0
      fs-wx-task/src/main/java/com/fs/app/controller/CommonController.java
  22. 53 56
      fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java
  23. 19 5
      fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

+ 1 - 1
fs-ai-call-task/src/main/java/com/fs/FsAiCallTaskApplication.java

@@ -13,7 +13,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 @EnableTransactionManagement
 @EnableAsync
-//@EnableScheduling
+@EnableScheduling
 public class FsAiCallTaskApplication
 {
     public static void main(String[] args){

+ 4 - 6
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -43,21 +43,19 @@ public class CallTaskService {
         String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
-        HashMap commonMp = new HashMap();
-        commonMp.put("callSource","timer");
         keys.parallelStream().forEach(key -> {
             try {
                 //doExec
-                CompletableFuture.runAsync(()->{
+                CompletableFuture.runAsync(() -> {
                     try {
-                        commonMp.put("callRedisKey",key);
                         ExecutionContext context = redisCache2.getCacheObject(key);
-                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                        context.setVariable("callRedisKey", key);
+                        context.setVariable("callSource", "callTaskTimer");
+                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context.getVariables());
                     } catch (Exception e) {
                         log.error("处理工作流延时任务异常 - key: {}", key, e);
                     }
                 }, cidExcutor);
-
             } catch (Exception ex) {
                 log.error("处理工作流延时任务异常 - key: {}", key, ex);
             }

+ 2 - 1
fs-service/src/main/java/com/fs/company/mapper/CompanyWxClientMapper.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.fs.company.domain.CompanyUser;
 import com.fs.company.domain.CompanyVoiceRoboticCallLogAddwx;
 import com.fs.company.domain.CompanyWxClient;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
@@ -72,7 +73,7 @@ public interface CompanyWxClientMapper extends BaseMapper<CompanyWxClient> {
 
     List<CompanyWxClient> getAddWxList(@Param("accountIdList") List<Long> accountIdList);
 
-    List<CompanyWxClient> getAddWxList4Workflow(@Param("accountIdList") List<Long> accountIdList, @Param("execStatus") Integer execStatus, @Param("execNodeType") Integer execNodeType);
+    List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(@Param("accountIdList") List<Long> accountIdList, @Param("execStatus") Integer execStatus, @Param("execNodeType") Integer execNodeType);
 
     CompanyWxClient selectWx(@Param("accountId") Long accountId, @Param("v3") String v3);
 

+ 2 - 1
fs-service/src/main/java/com/fs/company/service/ICompanyWxClientService.java

@@ -3,6 +3,7 @@ package com.fs.company.service;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.fs.company.domain.CompanyWxClient;
 import com.fs.company.vo.AddWxResultVo;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 
 import java.util.List;
 
@@ -70,5 +71,5 @@ public interface ICompanyWxClientService extends IService<CompanyWxClient> {
 
     List<CompanyWxClient> getAddWxList(List<Long> accountIdList);
 
-    List<CompanyWxClient> getAddWxList4Workflow(List<Long> accountIdList);
+    List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(List<Long> accountIdList);
 }

+ 55 - 51
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticCallLogCallphoneServiceImpl.java

@@ -151,20 +151,20 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
     @Async("callLogExcutor")
     public void asyncHandleCalleeCallBackResult(PushIIntentionResult result, CompanyVoiceRoboticCallees callees) {
         try {
-            String runTaskFlow = callees.getRunTaskFlow();
-            if(StringUtils.isBlank(runTaskFlow)){
-                runTaskFlow = Constants.CELL_PHONE;
-            }else{
-                if (!runTaskFlow.contains(Constants.CELL_PHONE)) {
-                    runTaskFlow = runTaskFlow + "," + Constants.CELL_PHONE;
-                }
-            }
-            if(!runTaskFlow.equals(callees.getRunTaskFlow())){
-                CompanyVoiceRoboticCallees updateCallees = new CompanyVoiceRoboticCallees();
-                updateCallees.setId(callees.getId());
-                updateCallees.setRunTaskFlow(runTaskFlow);
-                companyVoiceRoboticCalleesMapper.updateById(updateCallees);
-            }
+//            String runTaskFlow = callees.getRunTaskFlow();
+//            if(StringUtils.isBlank(runTaskFlow)){
+//                runTaskFlow = Constants.CELL_PHONE;
+//            }else{
+//                if (!runTaskFlow.contains(Constants.CELL_PHONE)) {
+//                    runTaskFlow = runTaskFlow + "," + Constants.CELL_PHONE;
+//                }
+//            }
+//            if(!runTaskFlow.equals(callees.getRunTaskFlow())){
+//                CompanyVoiceRoboticCallees updateCallees = new CompanyVoiceRoboticCallees();
+//                updateCallees.setId(callees.getId());
+//                updateCallees.setRunTaskFlow(runTaskFlow);
+//                companyVoiceRoboticCalleesMapper.updateById(updateCallees);
+//            }
             String json= configService.selectConfigByKey("cid.config");
             if(StringUtils.isBlank( json)){
                 log.error("未配置cid.config");
@@ -173,44 +173,48 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
 
             Notify notify = result.getNotify();
             String uuid = notify.getUuid();
-            getDialogMapDomain getDialogMap = getDialogMapDomain.builder()
-                    .uuid(uuid)
-                    .build();
-            CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLog = companyVoiceRoboticCallLogCallphoneMapper.selectNoResultLogByCallees(callees);
-
-            companyVoiceRoboticCallLog.setStatus(2);
-            companyVoiceRoboticCallLog.setResult(JSON.toJSONString(result));
-
-            CompanyWxClient companyWxClient = companyWxClientServiceImpl.getOne(new QueryWrapper<CompanyWxClient>().eq("robotic_id", callees.getRoboticId()).eq("customer_id", callees.getUserId()));
-            CompanyVoiceRoboticWx roboticWx = companyVoiceRoboticWxServiceImpl.getById(companyWxClient.getRoboticWxId());
-            CompanyWxAccount companyWxAccount = companyWxAccountMapper.selectCompanyWxAccountById(roboticWx.getAccountId());
-            companyVoiceRoboticCallLog.setCompanyUserId(companyWxAccount.getCompanyUserId());
-
-            // 调用接口查询通话其他信息
-            TaskInfo dialogMap = aiCallService.getDialogMapNew(getDialogMap, companyVoiceRoboticCallLog.getCompanyId());
-            // 写入其他记录
-            JSONObject telData = dialogMap.getTelData();
-            companyVoiceRoboticCallLog.setRecordPath((String) telData.getOrDefault("recordPath", ""));
-            companyVoiceRoboticCallLog.setContentList(telData.containsKey("contentList")?telData.getJSONArray("contentList").toJSONString() : "");
-            companyVoiceRoboticCallLog.setCallerNum((String) telData.getOrDefault("callerNum", ""));
-            companyVoiceRoboticCallLog.setCalleeNum((String) telData.getOrDefault("calleeNum", ""));
-            companyVoiceRoboticCallLog.setUuid((String) telData.getOrDefault("uuid", ""));
-            Long createTime =  telData.getLong("createTime");
-            companyVoiceRoboticCallLog.setCallCreateTime(createTime);
-            Long answerTime =  telData.getLong("answerTime");
-            companyVoiceRoboticCallLog.setCallAnswerTime(answerTime);
-            companyVoiceRoboticCallLog.setIntention((String) telData.getOrDefault("intention", ""));
-            companyVoiceRoboticCallLog.setCallTime( telData.getLong("duration"));
-            BigDecimal callCharge = cidConfigVO.getCallCharge();
-            //
-            if(null == callCharge){
-                callCharge = DEFAULT_CALL_CHARGE;
+
+
+            if(StringUtils.isNotBlank(uuid)){
+                getDialogMapDomain getDialogMap = getDialogMapDomain.builder()
+                        .uuid(uuid)
+                        .build();
+                CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLog = companyVoiceRoboticCallLogCallphoneMapper.selectNoResultLogByCallees(callees);
+
+                companyVoiceRoboticCallLog.setStatus(2);
+                companyVoiceRoboticCallLog.setResult(JSON.toJSONString(result));
+
+                CompanyWxClient companyWxClient = companyWxClientServiceImpl.getOne(new QueryWrapper<CompanyWxClient>().eq("robotic_id", callees.getRoboticId()).eq("customer_id", callees.getUserId()));
+                CompanyVoiceRoboticWx roboticWx = companyVoiceRoboticWxServiceImpl.getById(companyWxClient.getRoboticWxId());
+                CompanyWxAccount companyWxAccount = companyWxAccountMapper.selectCompanyWxAccountById(roboticWx.getAccountId());
+                companyVoiceRoboticCallLog.setCompanyUserId(companyWxAccount.getCompanyUserId());
+                // 调用接口查询通话其他信息
+                TaskInfo dialogMap = aiCallService.getDialogMapNew(getDialogMap, companyVoiceRoboticCallLog.getCompanyId());
+                // 写入其他记录
+                JSONObject telData = dialogMap.getTelData();
+                companyVoiceRoboticCallLog.setRecordPath((String) telData.getOrDefault("recordPath", ""));
+                companyVoiceRoboticCallLog.setContentList(telData.containsKey("contentList")?telData.getJSONArray("contentList").toJSONString() : "");
+                companyVoiceRoboticCallLog.setCallerNum((String) telData.getOrDefault("callerNum", ""));
+                companyVoiceRoboticCallLog.setCalleeNum((String) telData.getOrDefault("calleeNum", ""));
+                companyVoiceRoboticCallLog.setUuid((String) telData.getOrDefault("uuid", ""));
+                Long createTime =  telData.getLong("createTime");
+                companyVoiceRoboticCallLog.setCallCreateTime(createTime);
+                Long answerTime =  telData.getLong("answerTime");
+                companyVoiceRoboticCallLog.setCallAnswerTime(answerTime);
+                companyVoiceRoboticCallLog.setIntention((String) telData.getOrDefault("intention", ""));
+                companyVoiceRoboticCallLog.setCallTime( telData.getLong("duration"));
+                BigDecimal callCharge = cidConfigVO.getCallCharge();
+                //
+                if(null == callCharge){
+                    callCharge = DEFAULT_CALL_CHARGE;
+                }
+                //向上取整分钟数
+                BigDecimal divide = new BigDecimal(companyVoiceRoboticCallLog.getCallTime()).divide(ONE_MINUTES_SECOND, 0, RoundingMode.CEILING);
+                BigDecimal multiply = divide.multiply(callCharge);
+                companyVoiceRoboticCallLog.setCost(multiply);
+                baseMapper.updateCompanyVoiceRoboticCallLogCallphone(companyVoiceRoboticCallLog);
             }
-            //向上取整分钟数
-            BigDecimal divide = new BigDecimal(companyVoiceRoboticCallLog.getCallTime()).divide(ONE_MINUTES_SECOND, 0, RoundingMode.CEILING);
-            BigDecimal multiply = divide.multiply(callCharge);
-            companyVoiceRoboticCallLog.setCost(multiply);
-            baseMapper.updateCompanyVoiceRoboticCallLogCallphone(companyVoiceRoboticCallLog);
+
         } catch (Exception ex) {
             log.error("处理回调结果异常:{}", result, ex);
         }

+ 16 - 7
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -33,6 +33,7 @@ import lombok.Synchronized;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -720,11 +721,13 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
         return companyVoiceRoboticMapper.qwUserListCompany(companyVoiceRobotic);
     }
 
+    //todo 这个回调方法可优化成异步返回 防止同步阻塞造成多次回调
     @Override
+    @Async
     public void callerResult(PushIIntentionResult result) {
         log.info("进入外呼回调:{}", JSON.toJSONString(result));
         Notify notify = result.getNotify();
-        if(notify == null) return;
+        if(notify == null || StringUtils.isBlank(notify.getUuid())) return;
         if("pushDialogContent".equals(notify.getType())) pushDialogContent(result);
         if("billing".equals(notify.getType())) pushBilling(result);
 //        // 是否全部回调完毕
@@ -739,7 +742,9 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
                 param.put("callSource","callBack");
                 CompletableFuture.runAsync(()->{
                     companyWorkflowEngine.resumeFromBlockingNode(userData.getString("workflowInstanceId"),userData.getString("nodeKey"),param);
-                },cidWorkFlowExecutor);
+                },cidWorkFlowExecutor).thenRun(()->{
+                    redisCache2.deleteObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
+                });
             }
             redisCache2.deleteObject(notify.getUserData());
         }
@@ -804,13 +809,17 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
     }
     public CompanyVoiceRoboticCallees getResultCalleeInfo(Notify notify){
         String cacheString = (String) redisCache2.getCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
-        JSONObject parse = JSONObject.parseObject(cacheString,JSONObject.class);
-        CompanyVoiceRoboticCallees callee = companyVoiceRoboticCalleesMapper.selectCompanyVoiceRoboticCalleesById(parse.getLong("callerId"));
-        if(callee == null){
-            log.error("回调错误,未找到拨打手机号记录:{}", notify);
+        if(StringUtils.isNotBlank(cacheString)){
+            JSONObject parse = JSONObject.parseObject(cacheString,JSONObject.class);
+            CompanyVoiceRoboticCallees callee = companyVoiceRoboticCalleesMapper.selectCompanyVoiceRoboticCalleesById(parse.getLong("callerId"));
+            if(callee == null){
+                log.error("回调错误,未找到拨打手机号记录:{}", notify);
+                throw new BaseException("回调错误");
+            }
+            return callee;
+        }else{
             throw new BaseException("回调错误");
         }
-        return callee;
     }
 
     @Override

+ 6 - 2
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -507,9 +507,13 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
                 log.error("当前流程已扭转节点不匹配 - 期望: {}, 实际: {}", lastNodeKey, execNode);
                 throw new CustomException("节点不匹配,期望: " + lastNodeKey + ", 实际: " + execNode);
             }
-
+            if(!inputData.containsKey("callSource")){
+                throw new CustomException("未声明调用来源: " + inputData + "::" + workflowInstanceId);
+            }
+            String cs = (String) inputData.get("callSource");
             // 上个节点阻塞校验
-            if (!Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) {
+            if ( ("callTaskTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus()))||
+                    (("addWxTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.PAUSED.getValue()).equals(currentExec.getStatus())))){
                 throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
             }
             // 反序列化执行上下文并合并新的输入数据

+ 4 - 0
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowServiceImpl.java

@@ -76,6 +76,8 @@ public class CompanyWorkflowServiceImpl implements ICompanyWorkflowService {
             workflow.setCreateTime(now);
             workflow.setStartNodeKey(param.getStartNodeKey());
             workflow.setEndNodeKey(param.getEndNodeKey());
+            workflow.setCompanyId(param.getCompanyId());
+            workflow.setCompanyUserId(param.getCompanyUserId());
             companyWorkflowMapper.insertCompanyWorkflow(workflow);
             workflowId = workflow.getWorkflowId();
         } else {
@@ -89,6 +91,8 @@ public class CompanyWorkflowServiceImpl implements ICompanyWorkflowService {
             workflow.setStartNodeKey(param.getStartNodeKey());
             workflow.setEndNodeKey(param.getEndNodeKey());
             workflow.setUpdateTime(now);
+            workflow.setCompanyId(param.getCompanyId());
+            workflow.setCompanyUserId(param.getCompanyUserId());
             companyWorkflowMapper.updateCompanyWorkflow(workflow);
             // 删除旧的节点和连线
             companyWorkflowNodeMapper.deleteCompanyWorkflowNodeByWorkflowId(workflowId);

+ 7 - 1
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxClientServiceImpl.java

@@ -12,6 +12,7 @@ import com.fs.company.mapper.CompanyVoiceRoboticWxMapper;
 import com.fs.company.mapper.CompanyWxClientMapper;
 import com.fs.company.service.ICompanyWxClientService;
 import com.fs.company.vo.AddWxResultVo;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import com.fs.crm.domain.CrmCustomer;
 import com.fs.crm.service.impl.CrmCustomerServiceImpl;
 import com.fs.enums.ExecutionStatusEnum;
@@ -231,8 +232,13 @@ public class CompanyWxClientServiceImpl extends ServiceImpl<CompanyWxClientMappe
         return baseMapper.getAddWxList(accountIdList);
     }
 
+    /**
+     * 获取添加微信列表 工作流用
+     * @param accountIdList
+     * @return
+     */
     @Override
-    public  List<CompanyWxClient> getAddWxList4Workflow(List<Long> accountIdList){
+    public  List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(List<Long> accountIdList){
         return baseMapper.getAddWxList4Workflow(accountIdList, ExecutionStatusEnum.PAUSED.getValue(), NodeTypeEnum.AI_ADD_WX_TASK.getValue());
     }
 }

+ 15 - 2
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxServiceImpl.java

@@ -8,6 +8,7 @@ import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.DateUtils;
 import com.fs.company.domain.*;
+import com.fs.company.mapper.CompanyAiWorkflowExecLogMapper;
 import com.fs.company.mapper.CompanyAiWorkflowExecMapper;
 import com.fs.company.mapper.CompanyWxAccountMapper;
 import com.fs.company.mapper.CompanyWxClientMapper;
@@ -93,6 +94,8 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
     private CompanyAiWorkflowExecMapper companyAiWorkflowExecMapper;
     @Autowired
     private CompanyWorkflowEngine companyWorkflowEngine;
+    @Autowired
+    private CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
 
 
 
@@ -547,9 +550,19 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
             // 查找等待中的加微工作流实例
             CompanyAiWorkflowExec waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
                     wxClientId,
-                    ExecutionStatusEnum.WAITING.getValue(),
+                    ExecutionStatusEnum.PAUSED.getValue(),
                     NodeTypeEnum.AI_ADD_WX_TASK.getValue());
-
+            //查询工作流加微执行日志是否未更新状态
+            CompanyAiWorkflowExecLog queryP = new CompanyAiWorkflowExecLog();
+            queryP.setWorkflowInstanceId(waitingExec.getWorkflowInstanceId());
+            queryP.setNodeType(NodeTypeEnum.AI_ADD_WX_TASK.getValue());
+            queryP.setStatus(ExecutionStatusEnum.PAUSED.getValue());
+            List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(queryP);
+            companyAiWorkflowExecLogs.forEach(log -> {
+                log.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
+                companyAiWorkflowExecLogMapper.updateById(log);
+                }
+            );
             if (waitingExec == null) {
                 log.info("未找到等待中的加微工作流实例 - wxClientId: {}", wxClientId);
                 return;

+ 45 - 15
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -2,6 +2,7 @@ package com.fs.company.service.impl.call.node;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
 import com.fs.common.exception.CustomException;
 import com.fs.common.utils.StringUtils;
@@ -14,9 +15,12 @@ import com.fs.company.vo.ExecutionResult;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author MixLiu
@@ -36,6 +40,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
     public static final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
     public static final WorkflowNodeFactory workflowNodeFactory = SpringUtils.getBean(WorkflowNodeFactory.class);
     public static final ObjectMapper objectMapper = new ObjectMapper();
+    public static final RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
+    protected static final String NODE_EXEC_LOCK_PREFIX = "node_exec_lock_";
 
     protected String nodeKey;
     protected String nodeName;
@@ -53,13 +59,20 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
         // 记录执行开始时间
         long startTime = System.currentTimeMillis();
         ExecutionResult result = null;
+        RLock lock = null;
         try {
+            String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
+            lock = redissonClient.getLock(lockKey);
+            boolean isLocked;
+            isLocked = lock.tryLock(3, 30, TimeUnit.SECONDS);
+            if (!isLocked) {
+                log.info("实例节点正在执行,上锁失败 - workflowInstanceId: {}", context.getWorkflowInstanceId());
+                return handleExecutionError(new CustomException("实例节点正在执行"), context);
+            }
             // 执行前的通用处理
             preExecute(context);
-
             // 执行具体的业务逻辑
             result = doExecute(context);
-
             // 记录执行时间
             long executionTime = System.currentTimeMillis() - startTime;
             context.setVariable("execution_time_" + nodeKey, executionTime);
@@ -69,13 +82,27 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             return handleExecutionError(e, context);
         } finally {
             postExecute(context, result);
+            if (lock != null && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
         }
     }
 
     @Override
     public ExecutionResult continueExecute(ExecutionContext context) {
+        RLock lock = null;
         try {
+            String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
+            lock = redissonClient.getLock(lockKey);
+            boolean isLocked;
+            isLocked = lock.tryLock(3, 30, TimeUnit.SECONDS);
+            if (!isLocked) {
+                return handleExecutionError(new CustomException("实例节点正在执行"), context);
+            }
             CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
+            if (!context.getCurrentNodeKey().equals(companyAiWorkflowExec.getCurrentNodeKey())) {
+                return handleExecutionError(new CustomException("节点不符,当前节点: " + companyAiWorkflowExec.getCurrentNodeKey()), context);
+            }
             log.info("收到继续执行请求 - workflowInstanceId: {}, nodeKey: {}, 当前状态: {}",
                     context.getWorkflowInstanceId(), nodeKey, companyAiWorkflowExec.getStatus());
             CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
@@ -92,19 +119,22 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
         } catch (Exception e) {
             return handleExecutionError(e, context);
         } finally {
-            //更新流程日志信息
-            CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
-            companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
-            companyAiWorkflowExecLog.setNodeKey(nodeKey);
-            companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
-            List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
-            if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
-                CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
-                fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
-                fExecLog.setEndTime(new Date());
-                long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
-                fExecLog.setDuration(durationInMillis);
-                companyAiWorkflowExecLogMapper.updateById(fExecLog);
+            if (lock != null && lock.isHeldByCurrentThread()) {
+                //更新流程日志信息
+                CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
+                companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
+                companyAiWorkflowExecLog.setNodeKey(nodeKey);
+                companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
+                List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
+                if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
+                    CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
+                    fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
+                    fExecLog.setEndTime(new Date());
+                    long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
+                    fExecLog.setDuration(durationInMillis);
+                    companyAiWorkflowExecLogMapper.updateById(fExecLog);
+                }
+                lock.unlock();
             }
         }
     }

+ 120 - 72
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.constant.Constants;
 import com.fs.common.core.redis.RedisCacheT;
+import com.fs.common.utils.StringUtils;
 import com.fs.common.utils.spring.SpringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.CompanyWxClientMapper;
@@ -11,6 +12,7 @@ import com.fs.company.mapper.CompanyWorkflowNodeMapper;
 import com.fs.company.param.ExecutionContext;
 import com.fs.company.service.IWorkflowNode;
 import com.fs.company.vo.AiAddWxWorkflowConditionVo;
+import com.fs.company.vo.AiCallWorkflowConditionVo;
 import com.fs.company.vo.ExecutionResult;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
@@ -20,6 +22,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @author MixLiu
@@ -65,31 +68,37 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
 
         // 判断加微是否成功 (isAdd: 0否 1是 2待添加 3作废)
         boolean addSuccess = wxClient != null && Integer.valueOf(1).equals(wxClient.getIsAdd());
-        
+        //回调加微成功
         if (addSuccess) {
+            List<CompanyWorkflowEdge> cList = edges.stream().filter(a ->
+                            StringUtils.isNotBlank(a.getConditionExpr()) && JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class).get(0).isAdd())
+                    .collect(Collectors.toList());
+            if(null != cList && !cList.isEmpty() && nodeKey.equals(exec.getCurrentNodeKey())){
+                super.runNextNode(context, cList.get(0));
+            }
             // 加微成功,设置为等待状态,等待下一次回调
-            log.info("加微成功,设置工作流为等待状态 - workflowInstanceId: {}", context.getWorkflowInstanceId());
-            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), 
-                    context, ExecutionStatusEnum.WAITING);
-            return ExecutionResult.waiting().nextNodeKey("").build();
+//            log.info("加微成功,设置工作流为等待状态 - workflowInstanceId: {}", context.getWorkflowInstanceId());
+//            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(),
+//                    context, ExecutionStatusEnum.WAITING);
+//            return ExecutionResult.waiting().nextNodeKey("").build();
         } else {
+            List<CompanyWorkflowEdge> cList = edges.stream().filter(a ->
+                            StringUtils.isNotBlank(a.getConditionExpr()) && !JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class).get(0).isAdd())
+                    .collect(Collectors.toList());
             // 加微失败,根据条件判断走哪条边
-            for (CompanyWorkflowEdge edge : edges) {
-                if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
-                    continue; // 跳过无条件边
-                }
-                AiAddWxWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiAddWxWorkflowConditionVo.class);
+            CompanyWorkflowEdge edge = cList.get(0);
+                AiCallWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
                 // 匹配失败条件
-                if (!condition.isAddSuccess()) {
+                if (!condition.isAdd()) {
                     log.info("加微失败,执行失败分支 - workflowInstanceId: {}", context.getWorkflowInstanceId());
                     this.runNextNode(context, edge);
                     return null;
                 }
-            }
-            
+
             log.error("加微失败但未找到失败分支 - workflowInstanceId: {}", context.getWorkflowInstanceId());
             return null;
         }
+        return null;
     }
 
     /**
@@ -101,66 +110,70 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
         if (!isAsync()) {
-            return ExecutionResult.failure().nextNodeKey("").build();
+            return ExecutionResult.failure().nextNodeKey(null).build();
         }
-
         try {
-            // 1. 获取业务数据
-            CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
-            if (business == null) {
-                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
-            }
-
-            // 2. 通过 wxClientId 获取加微客户记录
-            Long wxClientId = business.getWxClientId();
-            if (wxClientId == null) {
-                return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
-            }
-            CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
-            if (wxClient == null) {
-                return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
-            }
-
-            // 3. 验证加微数据是否已准备好
-            Long accountId = wxClient.getAccountId();
-            if (accountId == null) {
-                return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
-            }
-            Long dialogId = wxClient.getDialogId();
-            if (dialogId == null) {
-                return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
-            }
-
-            // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
-            if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
-                log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
-                if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
-                    return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
-                }
-            }
-
-            Long roboticId = business.getRoboticId();
-            log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
-                    context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
-
-            // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
-            String taskKey = Constants.TASK_ID + roboticId;
-            redisCache.setCacheObject(taskKey, Constants.ADD_WX);
-            log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
-
-            // 6. 设置加微超时检测时间到 Redis
-            int timeoutMinutes = getTimeoutFromProperties();
-            long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
-            String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
-            redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
-            log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
-                    timeoutKey, timeoutMinutes, timeoutTimestamp);
-
-            // 7. 设置工作流为暂停状态,等待加微回调
             super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+            return ExecutionResult.paused()
+                    .outputData(context.getVariables())
+                    .nextNodeKey("").build();
 
-            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
-            return ExecutionResult.paused().nextNodeKey("").build();
+//            // 1. 获取业务数据
+//            CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
+//            if (business == null) {
+//                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
+//            }
+//
+//            // 2. 通过 wxClientId 获取加微客户记录
+//            Long wxClientId = business.getWxClientId();
+//            if (wxClientId == null) {
+//                return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
+//            }
+//            CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
+//            if (wxClient == null) {
+//                return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
+//            }
+//
+//            // 3. 验证加微数据是否已准备好
+//            Long accountId = wxClient.getAccountId();
+//            if (accountId == null) {
+//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
+//            }
+//            Long dialogId = wxClient.getDialogId();
+//            if (dialogId == null) {
+//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
+//            }
+//
+//            // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
+//            if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
+//                log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
+//                if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
+//                    return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
+//                }
+//            }
+//
+//            Long roboticId = business.getRoboticId();
+//            log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
+//                    context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
+//
+//            // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
+//            String taskKey = getDelayAddWxKeyPrefix() Constants.TASK_ID + roboticId;
+//            redisCache.setCacheObject(taskKey, Constants.ADD_WX);
+//            log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
+//
+//            // 6. 设置加微超时检测时间到 Redis
+//            int timeoutMinutes = getTimeoutFromProperties();
+//            long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
+//            String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
+//            redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
+//            log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
+//                    timeoutKey, timeoutMinutes, timeoutTimestamp);
+//
+//            // 7. 设置工作流为暂停状态,等待加微回调
+//            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+//
+//            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
+//            return ExecutionResult.paused().nextNodeKey("").build();
 
         } catch (Exception e) {
             log.error("准备加微任务数据异常 - workflowInstanceId: {}", context.getWorkflowInstanceId(), e);
@@ -180,8 +193,9 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
 
     /**
      * 运行下一个节点
+     *
      * @param context 执行上下文
-     * @param edge 边
+     * @param edge    
      */
     @Override
     protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
@@ -224,7 +238,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * 如果返回 false 表示已经被其他路径执行过了,不再执行
      *
      * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId 加微客户ID
+     * @param wxClientId         加微客户ID
      * @return 是否可以执行
      */
     public static boolean tryMarkAsExecuted(String workflowInstanceId, Long wxClientId) {
@@ -244,7 +258,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * 清除超时检测 Key
      *
      * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId 加微客户ID
+     * @param wxClientId         加微客户ID
      */
     public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
         String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
@@ -265,4 +279,38 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         }
         return String.format(DELAY_ADD_WX_KEY, nowDay.getHours(), nowDay.getMinutes());
     }
+
+    /**
+     * 完成加微动作
+     *
+     * @param workflowInstanceId
+     */
+    public void doneAddwx(String workflowInstanceId) {
+        ExecutionContext context = createExecutionContext(workflowInstanceId, nodeKey);
+        context.setVariable("lastNodeKey", nodeKey);
+        //启动定时节点倒计时
+        CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
+        if (!exec.getCurrentNodeKey().equals(nodeKey)) {
+            //当前节点已流转
+            log.error("当前节点已流转 ,目标:{},实际:{}", nodeKey, exec.getCurrentNodeKey());
+            return;
+        }
+        List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
+        edges.forEach(edge -> {
+            List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+            if (null == conditions || conditions.isEmpty()) {
+                super.runNextNode(context, edge);
+            } else {
+                AiCallWorkflowConditionVo condition = conditions.get(0);
+                //节点包含延时条件
+                if (null != condition.getAddTime() && !condition.isAdd()) {
+                    long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
+                    String redisKey = getDelayAddWxKeyPrefix(l) + workflowInstanceId;
+                    ExecutionContext nextContext = context.clone();
+                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+                    super.redisCache.setCacheObject(redisKey, nextContext);
+                }
+            }
+        });
+    }
 }

+ 33 - 28
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -56,36 +56,41 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
         edges.forEach(edge -> {
             List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
 //            Boolean isValid = true;
-            //暂时考虑单条件
-            AiCallWorkflowConditionVo condition = conditions.get(0);
-            //拨通
-            if (condition.isCallConnected() && callRes.getCallTime() != null && callRes.getCallTime() > 0) {
-                //如果含有意向度过滤
-                if (null != condition.getIntention()) {
-                    if (condition.getIntention().equals(callRes.getIntention())) {
+            if (null == conditions || conditions.isEmpty()) {
+                super.runNextNode(context, edge);
+            } else {
+                //暂时考虑单条件
+                AiCallWorkflowConditionVo condition = conditions.get(0);
+                //拨通
+                if (condition.isCallConnected() && callRes.getCallTime() != null && callRes.getCallTime() > 0) {
+                    //如果含有意向度过滤
+                    if (StringUtils.isNotBlank(condition.getIntention())) {
+                        if (condition.getIntention().equals(callRes.getIntention())) {
+                            super.runNextNode(context, edge);
+                        }
+                    } else {
                         super.runNextNode(context, edge);
                     }
-                } else {
-                    super.runNextNode(context, edge);
                 }
-            }
-            //未拨通
-            else if (!condition.isCallConnected() && (callRes.getCallTime() == null || Long.valueOf(0).equals(callRes.getCallTime()) || callRes.getCallAnswerTime() == null)) {
-                //延时操作
-                if (null != condition.getCallTime()) {
-                    //计算延时分片分钟
-                    long l = System.currentTimeMillis() + condition.getCallTime() * 60 * 1000;
-                    ExecutionContext nextContext = context.clone();
-                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
-                    //添加到延时扫描redis
-                    super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext,1, TimeUnit.DAYS);
-                    super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
-                }
-                //无时间驱动
-                else {
-                    super.runNextNode(context, edge);
+                //未拨通
+                else if (!condition.isCallConnected() && (callRes.getCallTime() == null || Long.valueOf(0).equals(callRes.getCallTime()) || callRes.getCallAnswerTime() == null)) {
+                    //延时操作
+                    if (null != condition.getCallTime() && condition.getCallTime() > 0) {
+                        //计算延时分片分钟
+                        long l = System.currentTimeMillis() + condition.getCallTime() * 60 * 1000;
+                        ExecutionContext nextContext = context.clone();
+                        nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+                        //添加到延时扫描redis
+                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
+                        super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
+                    }
+                    //无时间驱动
+                    else {
+                        super.runNextNode(context, edge);
+                    }
                 }
             }
+
         });
         return null;
     }
@@ -108,7 +113,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                     .nextNodeKey("").build();
         } else {
             return ExecutionResult.failure()
-                    .nextNodeKey("").build();
+                    .nextNodeKey(null).build();
         }
 
     }
@@ -138,11 +143,11 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
     }
 
     @Override
-    protected void postExecute(ExecutionContext context, ExecutionResult result){
+    protected void postExecute(ExecutionContext context, ExecutionResult result) {
         super.postExecute(context, result);
         String callRedisKey = context.getVariable("callRedisKey", String.class);
         //来源于定时调用doexec,调用后移除key
-        if(StringUtils.isNotBlank(callRedisKey)){
+        if (StringUtils.isNotBlank(callRedisKey)) {
             super.redisCache.deleteObject(callRedisKey);
         }
 

+ 1 - 0
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiSendMsgTaskNode.java

@@ -62,6 +62,7 @@ public class AiSendMsgTaskNode extends AbstractWorkflowNode {
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
         try {
+            Thread.sleep(1000L);
             // 获取业务数据
             CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
             if (business == null) {

+ 10 - 0
fs-service/src/main/java/com/fs/company/service/impl/call/node/EndNode.java

@@ -1,9 +1,11 @@
 package com.fs.company.service.impl.call.node;
 
 import com.fs.common.utils.spring.SpringUtils;
+import com.fs.company.domain.CompanyAiWorkflowExecLog;
 import com.fs.company.mapper.CompanyWorkflowNodeMapper;
 import com.fs.company.param.ExecutionContext;
 import com.fs.company.vo.ExecutionResult;
+import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
 
 import java.util.Map;
@@ -33,4 +35,12 @@ public class EndNode extends AbstractWorkflowNode {
     public NodeTypeEnum getType() {
         return NodeTypeEnum.END;
     }
+
+    /**
+     * 执行后的后处理
+     */
+    @Override
+    protected void postExecute(ExecutionContext context, ExecutionResult result) {
+      super.postExecute(context, result);
+    }
 }

+ 85 - 0
fs-service/src/main/java/com/fs/company/vo/CompanyWxClient4WorkFlowVO.java

@@ -0,0 +1,85 @@
+package com.fs.company.vo;
+
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fs.common.annotation.Excel;
+import com.fs.common.core.domain.BaseEntityTow;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author MixLiu
+ * @date 2026/2/5 14:35
+ * @description
+ */
+@Data
+public class CompanyWxClient4WorkFlowVO extends BaseEntityTow {
+
+    private static final long serialVersionUID = 1L;
+
+    /** 任务ID */
+    @Excel(name = "任务ID")
+    private Long roboticId;
+    /** 分配企微账号ID */
+    @Excel(name = "分配企微账号ID")
+    private Long roboticWxId;
+    /** 客户ID */
+    @Excel(name = "客户ID")
+    private Long customerId;
+    /** 话术ID */
+    @Excel(name = "话术ID")
+    private Long dialogId;
+
+    /** 昵称 */
+    @Excel(name = "昵称")
+    private String nickName;
+
+    /** 头像 */
+    @Excel(name = "头像")
+    private String avatar;
+
+    /** 手机号 */
+    @Excel(name = "手机号")
+    private String phone;
+
+    /** 微信号 */
+    @Excel(name = "微信号")
+    private String wxNo;
+
+    /** 微信号 */
+    @Excel(name = "微信号")
+    private String wxName;
+
+    /** 客户意向 */
+    @Excel(name = "客户意向")
+    private String intention;
+
+    /** 是否添加;0否1是 */
+    @Excel(name = "是否添加;0否1是2待添加3作废")
+    private Integer isAdd;
+    private Long accountId;
+
+    /** 添加时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @Excel(name = "添加时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime addTime;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @Excel(name = "添加完成时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime successAddTime;
+    private String wxV3;
+    private String wxV4;
+    private Long companyId;
+    private Long companyUserId;
+
+    private String wxNickName;
+    private String dialogName;
+    private String templateDetails;
+    private String template;
+    private String roboticName;
+    private String memo;
+    private String workflowInstanceId;
+    private String currentNodeKey;
+    private String currentNodeName;
+    private Integer currentNodeType;
+}

+ 4 - 0
fs-service/src/main/resources/mapper/company/CompanyWorkflowMapper.xml

@@ -131,6 +131,7 @@
             <if test="companyUserId != null">company_user_id,</if>
             <if test="startNodeKey != null">start_node_key,</if>
             <if test="endNodeKey != null">end_node_key,</if>
+            <if test="companyId != null">company_id,</if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="workflowName != null and workflowName != ''">#{workflowName},</if>
@@ -145,6 +146,7 @@
             <if test="companyUserId != null">#{companyUserId},</if>
             <if test="startNodeKey != null">#{startNodeKey},</if>
             <if test="endNodeKey != null">#{endNodeKey},</if>
+            <if test="companyId != null">#{companyId},</if>
         </trim>
     </insert>
     <insert id="insertCompanyWorkflowCompanyUser">
@@ -187,6 +189,8 @@
             <if test="remark != null">remark = #{remark},</if>
             <if test="startNodeKey != null">start_node_key = #{startNodeKey},</if>
             <if test="endNodeKey != null">end_node_key = #{endNodeKey},</if>
+            <if test="companyId != null">company_id = #{companyId},</if>
+            <if test="companyUserId != null">company_user_id = #{companyUserId},</if>
         </trim>
         where workflow_id = #{workflowId}
     </update>

+ 2 - 2
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -160,9 +160,9 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         </if>
         group by account_id
     </select>
-    <select id="getAddWxList4Workflow" resultType="com.fs.company.domain.CompanyWxClient">
+    <select id="getAddWxList4Workflow" resultType="com.fs.company.vo.CompanyWxClient4WorkFlowVO">
 
-        SELECT t1.* FROM company_wx_client t1
+        SELECT t1.*,t3.workflow_instance_id,t3.current_node_key,t3.current_node_name,t3.current_node_type FROM company_wx_client t1
                              inner join company_voice_robotic_business t2 on t1.id = t2.wx_client_id and t1.robotic_id = t2.robotic_id
                              inner join company_ai_workflow_exec t3 on t3.business_key = t2.id
         where t1.is_add = 0 and t1.account_id is not null

+ 1 - 1
fs-user-app/src/main/java/com/fs/framework/aspectj/lock/DistributeLockAspect.java

@@ -5,7 +5,7 @@ import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.redisson.api.RLock;
-import org.redisson.api.RedissonClient;
+import org.redisson.api.;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.StandardReflectionParameterNameDiscoverer;

+ 1 - 1
fs-wx-task/src/main/java/com/fs/FsWxTaskApplication.java

@@ -13,7 +13,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 @EnableTransactionManagement
 @EnableAsync
-//@EnableScheduling
+@EnableScheduling
 public class FsWxTaskApplication
 {
     public static void main(String[] args){

+ 10 - 0
fs-wx-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -51,6 +51,16 @@ public class CommonController {
         taskService.callNextTask();
     }
 
+    @GetMapping("addWx4Workflow")
+    public void addWx4Workflow(Long accountId) {
+        taskService.addWx4Workflow(Collections.singletonList(accountId));
+    }
+
+    @GetMapping("cidWorkflowAddWxRun")
+    public void cidWorkflowAddWxRun() {
+        taskService.cidWorkflowAddWxRun();
+    }
+
 
 
 

+ 53 - 56
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -8,20 +8,19 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.fs.common.constant.Constants;
 import com.fs.common.constant.FsConstants;
+import com.fs.common.core.redis.RedisCache;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.PubFun;
 import com.fs.common.utils.StringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.*;
 import com.fs.company.param.ExecutionContext;
-import com.fs.company.service.ICompanyVoiceRoboticService;
-import com.fs.company.service.ICompanyWxAccountService;
-import com.fs.company.service.ICompanyWxClientService;
-import com.fs.company.service.ICompanyWxDialogService;
+import com.fs.company.service.*;
 import com.fs.company.service.impl.*;
-import com.fs.company.service.CompanyWorkflowEngine;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
 import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.company.service.impl.call.node.WorkflowNodeFactory;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import com.fs.course.config.RedisKeyScanner;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
@@ -37,6 +36,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
+import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
@@ -76,6 +76,8 @@ public class WxTaskService {
     private final CompanyAiWorkflowExecMapper companyAiWorkflowExecMapper;
     private final CompanyWorkflowEngine companyWorkflowEngine;
     private final CompanyVoiceRoboticBusinessMapper companyVoiceRoboticBusinessMapper;
+    private final WorkflowNodeFactory workflowNodeFactory;
+    private final RedisCache redisCache2;
     private final ExecutorService cidExcutor = new ThreadPoolExecutor(
             32,
             64,
@@ -228,11 +230,11 @@ public class WxTaskService {
         String json = sysConfigService.selectConfigByKey("wx.config");
         WxConfig config = JSONUtil.toBean(json, WxConfig.class);
         // 需要添加微信的列表
-        List<CompanyWxClient> list = companyWxClientService.getAddWxList4Workflow(accountIdList);
+        List<CompanyWxClient4WorkFlowVO> list = companyWxClientService.getAddWxList4Workflow(accountIdList);
         log.info("需要添加微信的数量:{}", list.size());
         if (list.isEmpty()) return;
         List<CompanyWxClient> addList = new ArrayList<>();
-        Map<Long, CompanyWxClient> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient::getAccountId);
+        Map<Long, CompanyWxClient4WorkFlowVO> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient4WorkFlowVO::getAccountId);
         List<CompanyWxAccount> accountList = new ArrayList<>(companyWxAccountService.listByIds(clientMap.keySet()));
         log.info("查询加微的账号数量:{}", list.size());
         List<CompanyWxAccount> addAccountList = accountList.stream().filter(e -> {
@@ -252,11 +254,8 @@ public class WxTaskService {
         }).collect(Collectors.toList());
         log.info("实际加微的账号数量:{}", addAccountList.size());
         addAccountList.forEach(e -> {
-            CompanyWxClient client = clientMap.get(e.getId());
+            CompanyWxClient4WorkFlowVO client = clientMap.get(e.getId());
             if (client != null) {
-                String task = redisCache.getCacheObject(Constants.TASK_ID + client.getRoboticId());
-                log.info("ROBOTIC-ID:{},CLIENT-ID:{},当前任务执行状态:{}", client.getRoboticId(), client.getId(), task);
-                if (StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)) {
                     CompanyWxDialog dialog = companyWxDialogService.getById(client.getDialogId());
                     CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
                     String newTxt = objectPlaceholderResolver.resolvePlaceholders(crmCustomer, dialog.getTemplateDetails());
@@ -276,7 +275,9 @@ public class WxTaskService {
                         client.setAddTime(LocalDateTime.now());
                         client.setWxV3(vo.getV3());
                         client.setWxV4(vo.getV4());
-                        addList.add(client);
+                        CompanyWxClient addItem = new CompanyWxClient();
+                        BeanUtils.copyProperties(client, addItem);
+                        addList.add(addItem);
                         addLog.setStatus(2);
                         addLog.setResult(JSON.toJSONString(vo));
                     } else {
@@ -285,62 +286,30 @@ public class WxTaskService {
                         addLog.setResult(JSON.toJSONString(vo));
                     }
                     asyncSaveCompanyVoiceRoboticCallLog(addLog);
-                } else {
-                    log.error("ROBOTIC-ID:{},当前任务没有执行加微任务", client.getRoboticId());
-                }
             } else {
                 log.error("当前账号暂无需要添加微信:{}-{}", e.getId(), e.getWxNickName());
             }
         });
         if (!addList.isEmpty()) {
             companyWxClientService.updateBatchById(addList);
-            //根据加微成功的用户,判定是否加入延时执行下一步任务
-            Set<Long> roboticIdSet = addList.stream().map(CompanyWxClient::getRoboticId).collect(Collectors.toSet());
-            Set<Long> userIdSet = addList.stream().map(CompanyWxClient::getCustomerId).collect(Collectors.toSet());
-
-            //找到任务
-            List<CompanyVoiceRobotic> companyVoiceRobotics = companyVoiceRoboticMapper.selectBatchIds(roboticIdSet);
-            Map<Long, CompanyVoiceRobotic> roboticsMp = companyVoiceRobotics.stream().collect(Collectors.toMap(CompanyVoiceRobotic::getId, Function.identity(), (existing, replacement) -> existing));
-            //找到callees数据
-            List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet, roboticIdSet);
-            Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e -> e.getUserId() + "-" + e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
-
             long l = System.currentTimeMillis();
-
             //根据加微成功
             for (CompanyWxClient client : addList) {
-                CompanyVoiceRobotic clientRobotic = roboticsMp.getOrDefault(client.getRoboticId(), null);
-                if (null == clientRobotic) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
-                    continue;
-                }
-                CompanyVoiceRoboticCallees callees = calleesMp.getOrDefault(client.getCustomerId() + "-" + client.getRoboticId(), null);
-                if (null == callees) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
-                    continue;
-                }
-                Integer addWxTime = clientRobotic.getAddWxTime();
-                if (null == addWxTime) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有设置加微后置等待时间", client.getRoboticId(), client.getId());
-                } else {
-                    long endT = System.currentTimeMillis() + addWxTime * 60 * 1000;
-                    StringBuilder sb = new StringBuilder(Constants.CID_NEXT_TASK_ID).append(callees.getRoboticId()).append(":").append(callees.getId());
-                    redisCache.setCacheObject(sb.toString(), String.valueOf(endT));
+                CompanyWxClient4WorkFlowVO vo = clientMap.get(client.getAccountId());
+                IWorkflowNode node = workflowNodeFactory.createNode(vo.getCurrentNodeKey(),
+                        NodeTypeEnum.fromValue(vo.getCurrentNodeType()),
+                        vo.getCurrentNodeName(), null);
+                if (node instanceof AiAddWxTaskNode) {
+                    CompletableFuture.runAsync(() -> {
+                        AiAddWxTaskNode addWxNode = (AiAddWxTaskNode) node;
+                        addWxNode.doneAddwx(vo.getWorkflowInstanceId());
+                    }, cidExcutor);
                 }
             }
-            companyVoiceRoboticCallees.forEach(robotic ->
-                    robotic.setRunTaskFlow(
-                            StringUtils.isBlank(robotic.getRunTaskFlow()) ?
-                                    Constants.ADD_WX : robotic.getRunTaskFlow() + "," + Constants.ADD_WX
-                    )
-            );
-            companyVoiceRoboticCalleesServiceImpl.updateBatchById(companyVoiceRoboticCallees);
-            companyVoiceRoboticServiceImpl.finishAddWxByCallees(roboticIdSet);
-        }
-        if (!addAccountList.isEmpty()) {
-            companyWxAccountService.updateBatchById(addAccountList);
+            if (!addAccountList.isEmpty()) {
+                companyWxAccountService.updateBatchById(addAccountList);
+            }
         }
-
     }
 
     public void initAccountNum() {
@@ -826,5 +795,33 @@ public class WxTaskService {
             }
         }
     }
+    /**
+     * 扫描工作流延时任务
+     */
+    public void cidWorkflowAddWxRun() {
+        log.info("===========工作流延时任务开始扫描===========");
+        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(null) + "*";
+        Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
+        log.info("共扫描到 {} 个待处理键", keys.size());
+        keys.parallelStream().forEach(key -> {
+            try {
+                //doExec
+                CompletableFuture.runAsync(()->{
+                    try {
+                        ExecutionContext context = redisCache2.getCacheObject(key);
+                        context.setVariable("callRedisKey",key);
+                        context.setVariable("callSource","addWxTimer");
+                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                    } catch (Exception e) {
+                        log.error("处理工作流延时任务异常 - key: {}", key, e);
+                    }
+                }, cidExcutor);
+
+            } catch (Exception ex) {
+                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+            }
+        });
+        log.info("===========工作流延时任务扫描结束===========");
+    }
 
 }

+ 19 - 5
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -2,9 +2,12 @@ package com.fs.app.task;
 
 import com.fs.app.service.WxTaskService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.util.Collections;
+
 /**
  * 企业微信SOP定时任务管理类
  * 负责处理各种定时任务,包括SOP规则检查、消息发送、数据清理等
@@ -16,13 +19,17 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class WxTask {
 
+    @Autowired
     private WxTaskService taskService;
 
     @Scheduled(cron = "0 0/30 * * * ?")
     public void addWx() {
         taskService.addWx(null);
     }
-
+    @Scheduled(cron = "0 0/3 * * * ?")
+    public void addWx4Workflow() {
+        taskService.addWx4Workflow(null);
+    }
     @Scheduled(cron = "0 0 0 * * ?")
     public void initAccountNum() {
         taskService.initAccountNum();
@@ -37,10 +44,10 @@ public class WxTask {
     public void cellRun() {
         taskService.cellRun();
     }
-    @Scheduled(cron = "0 0/1 * * * ?")
-    public void callNextTask(){
-        taskService.callNextTask();
-    }
+//    @Scheduled(cron = "0 0/1 * * * ?")
+//    public void callNextTask(){
+//        taskService.callNextTask();
+//    }
 
     /**
      * 工作流加微超时检测
@@ -51,4 +58,11 @@ public class WxTask {
         taskService.checkWorkflowAddWxTimeout();
     }
 
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void cidWorkflowAddWxRun(){
+        taskService.cidWorkflowAddWxRun();
+    }
+
+
+
 }