Przeglądaj źródła

cid优化调整

lmx 2 tygodni temu
rodzic
commit
226d34a4d6

+ 7 - 7
fs-ai-call-task/src/main/java/com/fs/app/task/Task.java

@@ -29,13 +29,13 @@ public class Task {
     /**
      * 定时拉人进群
      */
-    @Scheduled(cron = "0 * * * * ?")
-    public void runTask(){
-        List<CompanyVoiceRobotic> list = roboticMapper.selectList(new QueryWrapper<CompanyVoiceRobotic>().eq("task_status", 1));
-        list.forEach(e -> {
-
-        });
-    }
+//    @Scheduled(cron = "0 * * * * ?")
+//    public void runTask(){
+//        List<CompanyVoiceRobotic> list = roboticMapper.selectList(new QueryWrapper<CompanyVoiceRobotic>().eq("task_status", 1));
+//        list.forEach(e -> {
+//
+//        });
+//    }
 
 
     @Scheduled(cron = "0 0/1 * * * ?")

+ 49 - 16
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticCallLogCallphoneServiceImpl.java

@@ -3,7 +3,11 @@ package com.fs.company.service.impl;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSON;
@@ -15,6 +19,7 @@ import com.fs.aicall.domain.apiresult.PushIIntentionResult;
 import com.fs.aicall.domain.param.getDialogMapDomain;
 import com.fs.aicall.service.AiCallService;
 import com.fs.common.constant.Constants;
+import com.fs.common.core.redis.RedisCache;
 import com.fs.common.utils.DateUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fs.common.utils.StringUtils;
@@ -22,12 +27,14 @@ import com.fs.company.domain.*;
 import com.fs.company.mapper.CompanyVoiceRoboticBusinessMapper;
 import com.fs.company.mapper.CompanyVoiceRoboticCalleesMapper;
 import com.fs.company.mapper.CompanyWxAccountMapper;
+import com.fs.company.service.CompanyWorkflowEngine;
 import com.fs.company.vo.CidConfigVO;
 import com.fs.store.config.StoreConfig;
 import com.fs.system.service.ISysConfigService;
 import com.fs.voice.constant.Constant;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import com.fs.company.mapper.CompanyVoiceRoboticCallLogCallphoneMapper;
@@ -55,6 +62,13 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
     CompanyVoiceRoboticCalleesMapper companyVoiceRoboticCalleesMapper;
     @Autowired
     CompanyVoiceRoboticBusinessMapper companyVoiceRoboticBusinessMapper;
+    @Autowired
+    RedisCache redisCache2;
+    @Autowired
+    CompanyWorkflowEngine companyWorkflowEngine;
+    @Autowired
+    @Qualifier("cidWorkFlowExecutor")
+    private Executor cidWorkFlowExecutor;
 
     /**
      * 查询调用日志_ai打电话
@@ -133,7 +147,7 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
         try {
             callPhoneLog.setCreateTime(DateUtils.getNowDate());
             baseMapper.insertCompanyVoiceRoboticCallLogCallphone(callPhoneLog);
-            companyVoiceRoboticBusinessMapper.updateActionCount(2,callPhoneLog.getRoboticId(),callPhoneLog.getCallerId(),null);
+            companyVoiceRoboticBusinessMapper.updateActionCount(2, callPhoneLog.getRoboticId(), callPhoneLog.getCallerId(), null);
         } catch (Exception e) {
             log.error("记录任务执行日志失败:失败数据:{}", callPhoneLog, e);
         }
@@ -146,8 +160,11 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
     @Autowired
     private ISysConfigService configService;
 
+    public static final String WORKFLOW_CALL_ONE_REDIS_KEY = "workflowCallOne:uuid:";
+
     private static final BigDecimal DEFAULT_CALL_CHARGE = new BigDecimal("0.12");
     private static final BigDecimal ONE_MINUTES_SECOND = new BigDecimal("60");
+
     @Async("callLogExcutor")
     public void asyncHandleCalleeCallBackResult(PushIIntentionResult result, CompanyVoiceRoboticCallees callees) {
         try {
@@ -165,8 +182,8 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
 //                updateCallees.setRunTaskFlow(runTaskFlow);
 //                companyVoiceRoboticCalleesMapper.updateById(updateCallees);
 //            }
-            String json= configService.selectConfigByKey("cid.config");
-            if(StringUtils.isBlank( json)){
+            String json = configService.selectConfigByKey("cid.config");
+            if (StringUtils.isBlank(json)) {
                 log.error("未配置cid.config");
             }
             CidConfigVO cidConfigVO = JSONUtil.toBean(json, CidConfigVO.class);
@@ -175,7 +192,7 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
             String uuid = notify.getUuid();
 
 
-            if(StringUtils.isNotBlank(uuid)){
+            if (StringUtils.isNotBlank(uuid)) {
                 getDialogMapDomain getDialogMap = getDialogMapDomain.builder()
                         .uuid(uuid)
                         .build();
@@ -193,19 +210,19 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
                 // 写入其他记录
                 JSONObject telData = dialogMap.getTelData();
                 companyVoiceRoboticCallLog.setRecordPath((String) telData.getOrDefault("recordPath", ""));
-                companyVoiceRoboticCallLog.setContentList(telData.containsKey("contentList")?telData.getJSONArray("contentList").toJSONString() : "");
+                companyVoiceRoboticCallLog.setContentList(telData.containsKey("contentList") ? telData.getJSONArray("contentList").toJSONString() : "");
                 companyVoiceRoboticCallLog.setCallerNum((String) telData.getOrDefault("callerNum", ""));
                 companyVoiceRoboticCallLog.setCalleeNum((String) telData.getOrDefault("calleeNum", ""));
                 companyVoiceRoboticCallLog.setUuid((String) telData.getOrDefault("uuid", ""));
-                Long createTime =  telData.getLong("createTime");
+                Long createTime = telData.getLong("createTime");
                 companyVoiceRoboticCallLog.setCallCreateTime(createTime);
-                Long answerTime =  telData.getLong("answerTime");
+                Long answerTime = telData.getLong("answerTime");
                 companyVoiceRoboticCallLog.setCallAnswerTime(answerTime);
                 companyVoiceRoboticCallLog.setIntention((String) telData.getOrDefault("intention", ""));
-                companyVoiceRoboticCallLog.setCallTime( telData.getLong("duration"));
+                companyVoiceRoboticCallLog.setCallTime(telData.getLong("duration"));
                 BigDecimal callCharge = cidConfigVO.getCallCharge();
                 //
-                if(null == callCharge){
+                if (null == callCharge) {
                     callCharge = DEFAULT_CALL_CHARGE;
                 }
                 //向上取整分钟数
@@ -213,34 +230,50 @@ public class CompanyVoiceRoboticCallLogCallphoneServiceImpl extends ServiceImpl<
                 BigDecimal multiply = divide.multiply(callCharge);
                 companyVoiceRoboticCallLog.setCost(multiply);
                 baseMapper.updateCompanyVoiceRoboticCallLogCallphone(companyVoiceRoboticCallLog);
+
+                if (StringUtils.isNotBlank(notify.getUserData())) {
+                    JSONObject userData = JSONObject.parseObject(redisCache2.getCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData()), JSONObject.class);
+                    if (null != userData && userData.containsKey("callBackUuid") && userData.containsKey("workflowInstanceId") && userData.containsKey("nodeKey")) {
+                        Map<String, Object> param = new HashMap<>();
+                        param.put("callBackUuid", userData.getString("callBackUuid"));
+                        param.put("callSource", "callBack");
+                        CompletableFuture.runAsync(() -> {
+                            companyWorkflowEngine.resumeFromBlockingNode(userData.getString("workflowInstanceId"), userData.getString("nodeKey"), param);
+                        }, cidWorkFlowExecutor).thenRun(() -> {
+                            redisCache2.deleteObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
+                        });
+                    }
+                    redisCache2.deleteObject(notify.getUserData());
+                }
             }
 
         } catch (Exception ex) {
             log.error("处理回调结果异常:{}", result, ex);
         }
     }
+
     @Async("callLogExcutor")
     public void asyncInsertCompanyVoiceRoboticCallLogBatch(List<CompanyVoiceRoboticCallLogCallphone> list) {
-        try{
-            list.stream().forEach(i->i.setCreateTime(new Date()));
+        try {
+            list.stream().forEach(i -> i.setCreateTime(new Date()));
             this.saveBatch(list);
         } catch (Exception e) {
-            log.error("批量记录任务执行日志失败:失败数据:{}",list, e);
+            log.error("批量记录任务执行日志失败:失败数据:{}", list, e);
         }
     }
 
-    public CompanyVoiceRoboticCallLogCallphone selectLogByRoboticIdAndCallerId(Long roboticId,Long callerId){
-            return  baseMapper.selectLogByRoboticIdAndCallerId(roboticId,callerId);
+    public CompanyVoiceRoboticCallLogCallphone selectLogByRoboticIdAndCallerId(Long roboticId, Long callerId) {
+        return baseMapper.selectLogByRoboticIdAndCallerId(roboticId, callerId);
     }
 
     @Override
-    public List<CompanyVoiceRoboticCallLogCallphone> selectCompanyVoiceRoboticCallLogCallphoneListData(CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLogCallphone){
+    public List<CompanyVoiceRoboticCallLogCallphone> selectCompanyVoiceRoboticCallLogCallphoneListData(CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLogCallphone) {
         List<CompanyVoiceRoboticCallLogCallphone> result = baseMapper.selectCompanyVoiceRoboticCallLogCallphoneListData(companyVoiceRoboticCallLogCallphone);
         return result;
     }
 
     @Override
-    public List<Long> getCallerIdsByCustomerId(Long customerId){
+    public List<Long> getCallerIdsByCustomerId(Long customerId) {
         return companyVoiceRoboticCalleesMapper.getCallerIdsByCustomerId(customerId);
     }
 }

+ 4 - 21
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -97,9 +97,6 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
     private final CompanyAiWorkflowExecMapper companyAiWorkflowExecMapper;
     private final CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
     private final RedisCache redisCache2;
-    @Autowired
-    @Qualifier("cidWorkFlowExecutor")
-    private Executor cidWorkFlowExecutor;
     /**
      * 查询机器人外呼任务
      *
@@ -308,8 +305,6 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
         return null;
 
     }
-
-    private final String WORKFLOW_CALL_ONE_REDIS_KEY = "workflowCallOne:uuid:";
     /**
      *  流程Ai呼叫 callOne
      * @param roboticId
@@ -330,7 +325,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             userDataJson.put("workflowInstanceId",context.getWorkflowInstanceId());
             userDataJson.put("callerId",callerId);
             context.setVariable("callBackUuid",callBackUuid);
-            redisCache2.setCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + callBackUuid, userDataJson.toJSONString());
+            redisCache2.setCacheObject(companyVoiceRoboticCallLogCallphoneService.WORKFLOW_CALL_ONE_REDIS_KEY + callBackUuid, userDataJson.toJSONString());
             CalleeDomain build = CalleeDomain.builder().number(callees.getPhone()).userData(callBackUuid).build();
             List<CalleeDomain> mobileList = new ArrayList<>();
             mobileList.add(build);
@@ -731,21 +726,9 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
         if("billing".equals(notify.getType())) pushBilling(result);
 //        // 是否全部回调完毕
         CompanyVoiceRoboticCallees callee = getResultCalleeInfo(notify);
-        //更新调用日志
-        companyVoiceRoboticCallLogCallphoneService.asyncHandleCalleeCallBackResult(result,callee);
         if(StringUtils.isNotBlank(notify.getUserData())){
-            JSONObject userData = JSONObject.parseObject(redisCache2.getCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData()),JSONObject.class) ;
-            if(null!= userData && userData.containsKey("callBackUuid") && userData.containsKey("workflowInstanceId") && userData.containsKey("nodeKey")){
-                Map<String, Object> param = new HashMap<>();
-                param.put("callBackUuid",userData.getString("callBackUuid"));
-                param.put("callSource","callBack");
-                CompletableFuture.runAsync(()->{
-                    companyWorkflowEngine.resumeFromBlockingNode(userData.getString("workflowInstanceId"),userData.getString("nodeKey"),param);
-                },cidWorkFlowExecutor).thenRun(()->{
-                    redisCache2.deleteObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
-                });
-            }
-            redisCache2.deleteObject(notify.getUserData());
+            //更新调用日志
+            companyVoiceRoboticCallLogCallphoneService.asyncHandleCalleeCallBackResult(result,callee);
         }
         long count = companyVoiceRoboticCalleesMapper.countByRoboticIdNotUuid(callee.getRoboticId());
         if(count == 0){
@@ -807,7 +790,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
         companyVoiceRoboticCalleesMapper.updateById(callee);
     }
     public CompanyVoiceRoboticCallees getResultCalleeInfo(Notify notify){
-        String cacheString = (String) redisCache2.getCacheObject(WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
+        String cacheString = (String) redisCache2.getCacheObject(companyVoiceRoboticCallLogCallphoneService.WORKFLOW_CALL_ONE_REDIS_KEY + notify.getUserData());
         if(StringUtils.isNotBlank(cacheString)){
             JSONObject parse = JSONObject.parseObject(cacheString,JSONObject.class);
             CompanyVoiceRoboticCallees callee = companyVoiceRoboticCalleesMapper.selectCompanyVoiceRoboticCalleesById(parse.getLong("callerId"));

+ 25 - 30
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -56,6 +56,9 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
     @Autowired
     private CompanyWorkflowNodeMapper companyWorkflowNodeMapper;
 
+    @Autowired
+    private CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
+
     /**
      * 初始化工作流
      * 创建工作流实例并保存初始状态
@@ -74,7 +77,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 
             // 保存当前执行记录
             saveInitialExecution(workflowInstanceId, workflowDefinitionId,
-                    definition.getStartNodeKey(), context,definition);
+                    definition.getStartNodeKey(), context, definition);
 
             log.info("工作流初始化成功: {} -> {}", workflowInstanceId, workflowDefinitionId);
 
@@ -260,7 +263,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
      * 保存初始执行记录
      */
     private void saveInitialExecution(String workflowInstanceId, Long workflowDefinitionId,
-                                      String startNodeKey, ExecutionContext context,CompanyWorkflow workflow) {
+                                      String startNodeKey, ExecutionContext context, CompanyWorkflow workflow) {
 
         try {
             CompanyAiWorkflowExec currentExec = new CompanyAiWorkflowExec();
@@ -450,28 +453,6 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             // 继续执行节点逻辑
             ExecutionResult result = node.continueExecute(context);
 
-//            // 记录执行日志
-//            logExecution(createLogEntry(workflowInstanceId, nodeKey, node.getType(), result));
-//
-//            // 更新当前执行记录
-//            updateCurrentExecution(workflowInstanceId, nodeKey, context);
-//
-//            // 更新工作流状态为运行中
-//            updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.RUNNING);
-
-            // 根据执行结果决定下一步操作
-//            if (result.isSuccess()) {
-//                if (StringUtils.isBlank(result.getNextNodeKey())) {
-//                    // 如果没有下一个节点,完成工作流
-//                    completeWorkflow(workflowInstanceId);
-//                } else {
-//                    // 执行下一个节点
-//                    executeNode(workflowInstanceId, result.getNextNodeKey());
-//                }
-//            } else {
-//                // 执行失败,更新状态
-//                updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-//            }
         } catch (Exception e) {
             log.error("唤醒阻塞节点失败: {} -> {}", workflowInstanceId, nodeKey, e);
 //            updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
@@ -497,23 +478,23 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             }
             String execNode = currentExec.getCurrentNodeKey();
 
-            if(null == inputData || !inputData.containsKey("lastNodeKey")){
+            if (null == inputData || !inputData.containsKey("lastNodeKey")) {
                 throw new CustomException("节点流程匹配有误: " + inputData);
             }
-            String lastNodeKey =(String) inputData.get("lastNodeKey");
+            String lastNodeKey = (String) inputData.get("lastNodeKey");
 
             // 验证节点是否匹配
             if (!lastNodeKey.equals(execNode)) {
                 log.error("当前流程已扭转节点不匹配 - 期望: {}, 实际: {}", lastNodeKey, execNode);
                 throw new CustomException("节点不匹配,期望: " + lastNodeKey + ", 实际: " + execNode);
             }
-            if(!inputData.containsKey("callSource")){
+            if (!inputData.containsKey("callSource")) {
                 throw new CustomException("未声明调用来源: " + inputData + "::" + workflowInstanceId);
             }
             String cs = (String) inputData.get("callSource");
             // 上个节点阻塞校验
-            if ( ("callTaskTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus()))||
-                    (("addWxTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.PAUSED.getValue()).equals(currentExec.getStatus())))){
+            if (("callTaskTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) ||
+                    (("addWxTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())))) {
                 throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
             }
             // 反序列化执行上下文并合并新的输入数据
@@ -526,13 +507,27 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 
             // 创建节点实例
             IWorkflowNode node = createNode(definition, nodeKey);
+            //执行之前标记上一个节点执行日志为超时
+            CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
+            companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
+            companyAiWorkflowExecLog.setNodeKey(lastNodeKey);
+            companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.WAITING.getValue());
+            List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
+            if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
+                CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
+                fExecLog.setStatus(ExecutionStatusEnum.TIMEOUT.getValue());
+                fExecLog.setEndTime(new Date());
+                long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
+                fExecLog.setDuration(durationInMillis);
+                companyAiWorkflowExecLogMapper.updateById(fExecLog);
+            }
             if (node == null) {
                 throw new CustomException("节点不存在: " + nodeKey);
             }
 
             node.execute(context);
 
-        } catch (Exception e){
+        } catch (Exception e) {
 
         }
     }

+ 40 - 16
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -19,6 +19,7 @@ import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.scheduling.annotation.Async;
 
+import javax.xml.soap.Node;
 import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -57,7 +58,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
 
     @Override
     public ExecutionResult execute(ExecutionContext context) {
-        if(!runnable(context)){
+        if(!runnable(context) && getType() != NodeTypeEnum.END){
             log.info("当前流程已到达结束节点,节点执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
             return null;
         }
@@ -96,7 +97,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
 
     @Override
     public ExecutionResult continueExecute(ExecutionContext context) {
-        if(!runnable(context)){
+        if(!runnable(context) && getType() != NodeTypeEnum.END){
             log.info("当前流程已到达结束节点,节点继续执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
             return null;
         }
@@ -130,24 +131,47 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             return handleExecutionError(e, context);
         } finally {
             if (lock != null && lock.isHeldByCurrentThread()) {
-                //更新流程日志信息
-                CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
-                companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
-                companyAiWorkflowExecLog.setNodeKey(nodeKey);
-                companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
-                List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
-                if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
-                    CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
-                    fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
-                    fExecLog.setEndTime(new Date());
-                    long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
-                    fExecLog.setDuration(durationInMillis);
-                    companyAiWorkflowExecLogMapper.updateById(fExecLog);
+                if(getType().equals(NodeTypeEnum.AI_CALL_TASK)){
+                    updateLogStatusIfExist(context,ExecutionStatusEnum.PAUSED,ExecutionStatusEnum.SUCCESS);
                 }
+                else if(getType().equals(NodeTypeEnum.AI_ADD_WX_TASK)){
+                    updateLogStatusIfExist(context,ExecutionStatusEnum.WAITING,ExecutionStatusEnum.SUCCESS);
+                }
+
+//                //更新流程日志信息
+//                CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
+//                companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
+//                companyAiWorkflowExecLog.setNodeKey(nodeKey);
+//                companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
+//                List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
+//                if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
+//                    CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
+//                    fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
+//                    fExecLog.setEndTime(new Date());
+//                    long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
+//                    fExecLog.setDuration(durationInMillis);
+//                    companyAiWorkflowExecLogMapper.updateById(fExecLog);
+//                }
                 lock.unlock();
             }
         }
     }
+    protected void updateLogStatusIfExist(ExecutionContext context, ExecutionStatusEnum findStatus, ExecutionStatusEnum status) {
+        //更新流程日志信息
+        CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
+        companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
+        companyAiWorkflowExecLog.setNodeKey(nodeKey);
+        companyAiWorkflowExecLog.setStatus(findStatus.getValue());
+        List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
+        if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
+            CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
+            fExecLog.setStatus(status.getValue());
+            fExecLog.setEndTime(new Date());
+            long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
+            fExecLog.setDuration(durationInMillis);
+            companyAiWorkflowExecLogMapper.updateById(fExecLog);
+        }
+    }
 
     protected abstract ExecutionResult doContinue(ExecutionContext context);
 
@@ -339,7 +363,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             update.setLastUpdateTime(LocalDateTime.now());
             update.setVariables(objectMapper.writeValueAsString(context.getVariables()));
             update.setCurrentNodeKey(nodeKey);
-            CompanyWorkflowNode currentNode = context.getVariable("currentNode", CompanyWorkflowNode.class);
+            CompanyWorkflowNode currentNode = context.getVariable("currentNode", CompanyWorkflowNode.class) ==null ? getNodeByKey(nodeKey):context.getVariable("currentNode", CompanyWorkflowNode.class);
             update.setCurrentNodeType(NodeTypeEnum.fromCode(currentNode.getNodeType()).getValue());
             update.setCurrentNodeName(currentNode.getNodeName());
             companyAiWorkflowExecMapper.updateByWorkflowInstanceId(update);

+ 7 - 58
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -53,6 +53,8 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      */
     @Override
     protected ExecutionResult doContinue(ExecutionContext context) {
+        //收到回调代表加微通过了
+
         CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
         List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
 
@@ -125,65 +127,9 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
                     .outputData(context.getVariables())
                     .nextNodeKey("").build();
 
-//            // 1. 获取业务数据
-//            CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
-//            if (business == null) {
-//                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
-//            }
-//
-//            // 2. 通过 wxClientId 获取加微客户记录
-//            Long wxClientId = business.getWxClientId();
-//            if (wxClientId == null) {
-//                return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
-//            }
-//            CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
-//            if (wxClient == null) {
-//                return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
-//            }
-//
-//            // 3. 验证加微数据是否已准备好
-//            Long accountId = wxClient.getAccountId();
-//            if (accountId == null) {
-//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
-//            }
-//            Long dialogId = wxClient.getDialogId();
-//            if (dialogId == null) {
-//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
-//            }
-//
-//            // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
-//            if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
-//                log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
-//                if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
-//                    return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
-//                }
-//            }
-//
-//            Long roboticId = business.getRoboticId();
-//            log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
-//                    context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
-//
-//            // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
-//            String taskKey = getDelayAddWxKeyPrefix() Constants.TASK_ID + roboticId;
-//            redisCache.setCacheObject(taskKey, Constants.ADD_WX);
-//            log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
-//
-//            // 6. 设置加微超时检测时间到 Redis
-//            int timeoutMinutes = getTimeoutFromProperties();
-//            long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
-//            String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
-//            redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
-//            log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
-//                    timeoutKey, timeoutMinutes, timeoutTimestamp);
-//
-//            // 7. 设置工作流为暂停状态,等待加微回调
-//            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
-//
-//            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
-//            return ExecutionResult.paused().nextNodeKey("").build();
-
         } catch (Exception e) {
-            log.error("准备加微任务数据异常 - workflowInstanceId: {}", context.getWorkflowInstanceId(), e);
+            log.error("准备加微任务数据异常 流程:{}:节点:{}执行失败,", context.getWorkflowInstanceId(), nodeKey, e);
+            super.updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.INTERRUPT);
             return ExecutionResult.failure().errorMessage("准备加微任务数据异常: " + e.getMessage()).build();
         }
     }
@@ -302,6 +248,9 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
             log.error("当前节点已流转 ,目标:{},实际:{}", nodeKey, exec.getCurrentNodeKey());
             return;
         }
+        //更新加微日志执行状态
+        super.updateLogStatusIfExist(context, ExecutionStatusEnum.PAUSED, ExecutionStatusEnum.WAITING);
+        super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), nodeKey, context, ExecutionStatusEnum.WAITING);
         List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
         edges.forEach(edge -> {
             List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);

+ 29 - 15
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -16,6 +16,7 @@ import com.fs.company.vo.AiCallWorkflowConditionVo;
 import com.fs.company.vo.ExecutionResult;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
+import lombok.extern.slf4j.Slf4j;
 
 import java.time.LocalDateTime;
 import java.util.Date;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
  * @date 2026/1/28 13:39
  * @description AI外呼电话任务节点
  */
+@Slf4j
 public class AiCallTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     private static final ICompanyVoiceRoboticService companyVoiceRoboticService = SpringUtils.getBean(ICompanyVoiceRoboticService.class);
@@ -53,6 +55,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
         List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
         //获取外呼回调结果日志
         CompanyVoiceRoboticCallLogCallphone callRes = super.companyVoiceRoboticCallLogCallphoneMapper.selectCallLogByCallbackUuid(context.getVariable("callBackUuid", String.class));
+        int i = 0;
         edges.forEach(edge -> {
             List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
 //            Boolean isValid = true;
@@ -67,6 +70,10 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                     if (StringUtils.isNotBlank(condition.getIntention())) {
                         if (condition.getIntention().equals(callRes.getIntention())) {
                             super.runNextNode(context, edge);
+                        } else {
+                            //不符合意向度配置的回调 暂时不再执行 中断流程
+                            log.info("流程:{},节点:{},意向度设置:{},实际意向度:{}, 意向度不符设置中断执行,", context.getWorkflowInstanceId(), nodeKey, condition.getIntention(), callRes.getIntention());
+                            super.updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.INTERRUPT);
                         }
                     } else {
                         super.runNextNode(context, edge);
@@ -83,6 +90,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                         //添加到延时扫描redis
                         super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
                         super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
+                        updateLogStatusIfExist(context,ExecutionStatusEnum.PAUSED,ExecutionStatusEnum.WAITING);
                     }
                     //无时间驱动
                     else {
@@ -97,25 +105,31 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
 
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
-        if (isAsync()) {
-            CompanyWorkflowNode node = context.getVariable("currentNode", CompanyWorkflowNode.class);
-            String nodeConfig = node.getNodeConfig();
-            AiCallConfigVO callConfigVo = JSONObject.parseObject(nodeConfig, AiCallConfigVO.class);
-            //执行外呼逻辑 需要传入节点信息
-            CompanyVoiceRoboticBusiness bus = super.getRoboticBusiness(context.getWorkflowInstanceId());
-            if (bus == null) {
-                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
+        try {
+            if (isAsync()) {
+                CompanyWorkflowNode node = context.getVariable("currentNode", CompanyWorkflowNode.class);
+                String nodeConfig = node.getNodeConfig();
+                AiCallConfigVO callConfigVo = JSONObject.parseObject(nodeConfig, AiCallConfigVO.class);
+                //执行外呼逻辑 需要传入节点信息
+                CompanyVoiceRoboticBusiness bus = super.getRoboticBusiness(context.getWorkflowInstanceId());
+                if (bus == null) {
+                    return ExecutionResult.failure().errorMessage("未找到业务数据").build();
+                }
+                companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context, callConfigVo);
+                super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+                return ExecutionResult.paused()
+                        .outputData(context.getVariables())
+                        .nextNodeKey("").build();
+            } else {
+                return ExecutionResult.failure()
+                        .nextNodeKey(null).build();
             }
-            companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context, callConfigVo);
-            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
-            return ExecutionResult.paused()
-                    .outputData(context.getVariables())
-                    .nextNodeKey("").build();
-        } else {
+        } catch (Exception ex) {
+            log.error("流程:{}:节点:{}执行失败,", context.getWorkflowInstanceId(), nodeKey, ex);
+            super.updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.INTERRUPT);
             return ExecutionResult.failure()
                     .nextNodeKey(null).build();
         }
-
     }
 
     @Override

+ 2 - 2
fs-service/src/main/java/com/fs/wxcid/threadExecutor/cidCompanyWorkFlowExecutor.java

@@ -15,8 +15,8 @@ public class cidCompanyWorkFlowExecutor {
     @Bean("cidWorkFlowExecutor")
     public Executor cidWorkFlowExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(8);
-        executor.setMaxPoolSize(16);
+        executor.setCorePoolSize(16);
+        executor.setMaxPoolSize(32);
         executor.setQueueCapacity(20000);
         executor.setThreadNamePrefix("WorkfLow-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

+ 1 - 1
fs-service/src/main/resources/mapper/company/CompanyVoiceRoboticBusinessMapper.xml

@@ -147,6 +147,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         FROM
             company_voice_robotic_business t1
                 inner join company_ai_workflow_exec t2 on t1.id = t2.business_key
-        where t1.robotic_id = #{roboticId} and t2.current_node_key != #{endNodeKey};
+        where t1.robotic_id = #{roboticId} and t2.current_node_key != #{endNodeKey} and t2.status != 8
     </select>
 </mapper>

+ 1 - 0
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -118,6 +118,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="addTime != null">add_time = #{addTime},</if>
             <if test="remark != null">remark = #{remark},</if>
             <if test="createTime != null">create_time = #{createTime},</if>
+            <if test="dialogId != null">dialog_id = #{dialogId},</if>
         </trim>
         where id = #{id}
     </update>

+ 1 - 1
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -26,7 +26,7 @@ public class WxTask {
     public void addWx() {
         taskService.addWx(null);
     }
-    @Scheduled(cron = "0 0/3 * * * ?")
+    @Scheduled(cron = "0 0/1 * * * ?")
     public void addWx4Workflow() {
         taskService.addWx4Workflow(null);
     }