lmx пре 3 недеља
родитељ
комит
2c179ce4d8

+ 2 - 3
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -43,15 +43,14 @@ public class CallTaskService {
         String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
-        HashMap commonMp = new HashMap();
-        commonMp.put("callSource","timer");
         keys.parallelStream().forEach(key -> {
             try {
                 //doExec
                 CompletableFuture.runAsync(()->{
                     try {
-                        commonMp.put("callRedisKey",key);
                         ExecutionContext context = redisCache2.getCacheObject(key);
+                        context.setVariable("callRedisKey",key);
+                        context.setVariable("callSource","callTaskTimer");
                         companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
                     } catch (Exception e) {
                         log.error("处理工作流延时任务异常 - key: {}", key, e);

+ 2 - 1
fs-service/src/main/java/com/fs/company/mapper/CompanyWxClientMapper.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.fs.company.domain.CompanyUser;
 import com.fs.company.domain.CompanyVoiceRoboticCallLogAddwx;
 import com.fs.company.domain.CompanyWxClient;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
@@ -72,7 +73,7 @@ public interface CompanyWxClientMapper extends BaseMapper<CompanyWxClient> {
 
     List<CompanyWxClient> getAddWxList(@Param("accountIdList") List<Long> accountIdList);
 
-    List<CompanyWxClient> getAddWxList4Workflow(@Param("accountIdList") List<Long> accountIdList, @Param("execStatus") Integer execStatus, @Param("execNodeType") Integer execNodeType);
+    List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(@Param("accountIdList") List<Long> accountIdList, @Param("execStatus") Integer execStatus, @Param("execNodeType") Integer execNodeType);
 
     CompanyWxClient selectWx(@Param("accountId") Long accountId, @Param("v3") String v3);
 

+ 2 - 1
fs-service/src/main/java/com/fs/company/service/ICompanyWxClientService.java

@@ -3,6 +3,7 @@ package com.fs.company.service;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.fs.company.domain.CompanyWxClient;
 import com.fs.company.vo.AddWxResultVo;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 
 import java.util.List;
 
@@ -70,5 +71,5 @@ public interface ICompanyWxClientService extends IService<CompanyWxClient> {
 
     List<CompanyWxClient> getAddWxList(List<Long> accountIdList);
 
-    List<CompanyWxClient> getAddWxList4Workflow(List<Long> accountIdList);
+    List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(List<Long> accountIdList);
 }

+ 6 - 2
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -507,9 +507,13 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
                 log.error("当前流程已扭转节点不匹配 - 期望: {}, 实际: {}", lastNodeKey, execNode);
                 throw new CustomException("节点不匹配,期望: " + lastNodeKey + ", 实际: " + execNode);
             }
-
+            if(!inputData.containsKey("callSource")){
+                throw new CustomException("未声明调用来源: " + inputData + "::" + workflowInstanceId);
+            }
+            String cs = (String) inputData.get("callSource");
             // 上个节点阻塞校验
-            if (!Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) {
+            if ( ("callTaskTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus()))||
+                    (("addWxTimer".equals(cs) && !Integer.valueOf(ExecutionStatusEnum.PAUSED.getValue()).equals(currentExec.getStatus())))){
                 throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
             }
             // 反序列化执行上下文并合并新的输入数据

+ 7 - 1
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxClientServiceImpl.java

@@ -12,6 +12,7 @@ import com.fs.company.mapper.CompanyVoiceRoboticWxMapper;
 import com.fs.company.mapper.CompanyWxClientMapper;
 import com.fs.company.service.ICompanyWxClientService;
 import com.fs.company.vo.AddWxResultVo;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import com.fs.crm.domain.CrmCustomer;
 import com.fs.crm.service.impl.CrmCustomerServiceImpl;
 import com.fs.enums.ExecutionStatusEnum;
@@ -231,8 +232,13 @@ public class CompanyWxClientServiceImpl extends ServiceImpl<CompanyWxClientMappe
         return baseMapper.getAddWxList(accountIdList);
     }
 
+    /**
+     * 获取添加微信列表 工作流用
+     * @param accountIdList
+     * @return
+     */
     @Override
-    public  List<CompanyWxClient> getAddWxList4Workflow(List<Long> accountIdList){
+    public  List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(List<Long> accountIdList){
         return baseMapper.getAddWxList4Workflow(accountIdList, ExecutionStatusEnum.PAUSED.getValue(), NodeTypeEnum.AI_ADD_WX_TASK.getValue());
     }
 }

+ 120 - 72
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.constant.Constants;
 import com.fs.common.core.redis.RedisCacheT;
+import com.fs.common.utils.StringUtils;
 import com.fs.common.utils.spring.SpringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.CompanyWxClientMapper;
@@ -11,6 +12,7 @@ import com.fs.company.mapper.CompanyWorkflowNodeMapper;
 import com.fs.company.param.ExecutionContext;
 import com.fs.company.service.IWorkflowNode;
 import com.fs.company.vo.AiAddWxWorkflowConditionVo;
+import com.fs.company.vo.AiCallWorkflowConditionVo;
 import com.fs.company.vo.ExecutionResult;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
@@ -20,6 +22,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @author MixLiu
@@ -65,31 +68,37 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
 
         // 判断加微是否成功 (isAdd: 0否 1是 2待添加 3作废)
         boolean addSuccess = wxClient != null && Integer.valueOf(1).equals(wxClient.getIsAdd());
-        
+        //回调加微成功
         if (addSuccess) {
+            List<CompanyWorkflowEdge> cList = edges.stream().filter(a ->
+                            StringUtils.isNotBlank(a.getConditionExpr()) && JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class).get(0).isAdd())
+                    .collect(Collectors.toList());
+            if(null != cList && !cList.isEmpty()){
+                super.runNextNode(context, cList.get(0));
+            }
             // 加微成功,设置为等待状态,等待下一次回调
-            log.info("加微成功,设置工作流为等待状态 - workflowInstanceId: {}", context.getWorkflowInstanceId());
-            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), 
-                    context, ExecutionStatusEnum.WAITING);
-            return ExecutionResult.waiting().nextNodeKey("").build();
+//            log.info("加微成功,设置工作流为等待状态 - workflowInstanceId: {}", context.getWorkflowInstanceId());
+//            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(),
+//                    context, ExecutionStatusEnum.WAITING);
+//            return ExecutionResult.waiting().nextNodeKey("").build();
         } else {
+            List<CompanyWorkflowEdge> cList = edges.stream().filter(a ->
+                            StringUtils.isNotBlank(a.getConditionExpr()) && !JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class).get(0).isAdd())
+                    .collect(Collectors.toList());
             // 加微失败,根据条件判断走哪条边
-            for (CompanyWorkflowEdge edge : edges) {
-                if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
-                    continue; // 跳过无条件边
-                }
-                AiAddWxWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiAddWxWorkflowConditionVo.class);
+            CompanyWorkflowEdge edge = cList.get(0);
+                AiCallWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
                 // 匹配失败条件
-                if (!condition.isAddSuccess()) {
+                if (!condition.isAdd()) {
                     log.info("加微失败,执行失败分支 - workflowInstanceId: {}", context.getWorkflowInstanceId());
                     this.runNextNode(context, edge);
                     return null;
                 }
-            }
-            
+
             log.error("加微失败但未找到失败分支 - workflowInstanceId: {}", context.getWorkflowInstanceId());
             return null;
         }
+        return null;
     }
 
     /**
@@ -101,66 +110,70 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
         if (!isAsync()) {
-            return ExecutionResult.failure().nextNodeKey("").build();
+            return ExecutionResult.failure().nextNodeKey(null).build();
         }
-
         try {
-            // 1. 获取业务数据
-            CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
-            if (business == null) {
-                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
-            }
-
-            // 2. 通过 wxClientId 获取加微客户记录
-            Long wxClientId = business.getWxClientId();
-            if (wxClientId == null) {
-                return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
-            }
-            CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
-            if (wxClient == null) {
-                return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
-            }
-
-            // 3. 验证加微数据是否已准备好
-            Long accountId = wxClient.getAccountId();
-            if (accountId == null) {
-                return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
-            }
-            Long dialogId = wxClient.getDialogId();
-            if (dialogId == null) {
-                return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
-            }
-
-            // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
-            if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
-                log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
-                if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
-                    return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
-                }
-            }
-
-            Long roboticId = business.getRoboticId();
-            log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
-                    context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
-
-            // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
-            String taskKey = Constants.TASK_ID + roboticId;
-            redisCache.setCacheObject(taskKey, Constants.ADD_WX);
-            log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
-
-            // 6. 设置加微超时检测时间到 Redis
-            int timeoutMinutes = getTimeoutFromProperties();
-            long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
-            String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
-            redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
-            log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
-                    timeoutKey, timeoutMinutes, timeoutTimestamp);
-
-            // 7. 设置工作流为暂停状态,等待加微回调
             super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+            return ExecutionResult.paused()
+                    .outputData(context.getVariables())
+                    .nextNodeKey("").build();
 
-            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
-            return ExecutionResult.paused().nextNodeKey("").build();
+//            // 1. 获取业务数据
+//            CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
+//            if (business == null) {
+//                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
+//            }
+//
+//            // 2. 通过 wxClientId 获取加微客户记录
+//            Long wxClientId = business.getWxClientId();
+//            if (wxClientId == null) {
+//                return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
+//            }
+//            CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
+//            if (wxClient == null) {
+//                return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
+//            }
+//
+//            // 3. 验证加微数据是否已准备好
+//            Long accountId = wxClient.getAccountId();
+//            if (accountId == null) {
+//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
+//            }
+//            Long dialogId = wxClient.getDialogId();
+//            if (dialogId == null) {
+//                return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
+//            }
+//
+//            // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
+//            if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
+//                log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
+//                if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
+//                    return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
+//                }
+//            }
+//
+//            Long roboticId = business.getRoboticId();
+//            log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
+//                    context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
+//
+//            // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
+//            String taskKey = getDelayAddWxKeyPrefix() Constants.TASK_ID + roboticId;
+//            redisCache.setCacheObject(taskKey, Constants.ADD_WX);
+//            log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
+//
+//            // 6. 设置加微超时检测时间到 Redis
+//            int timeoutMinutes = getTimeoutFromProperties();
+//            long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
+//            String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
+//            redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
+//            log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
+//                    timeoutKey, timeoutMinutes, timeoutTimestamp);
+//
+//            // 7. 设置工作流为暂停状态,等待加微回调
+//            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+//
+//            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
+//            return ExecutionResult.paused().nextNodeKey("").build();
 
         } catch (Exception e) {
             log.error("准备加微任务数据异常 - workflowInstanceId: {}", context.getWorkflowInstanceId(), e);
@@ -180,8 +193,9 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
 
     /**
      * 运行下一个节点
+     *
      * @param context 执行上下文
-     * @param edge 边
+     * @param edge    
      */
     @Override
     protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
@@ -224,7 +238,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * 如果返回 false 表示已经被其他路径执行过了,不再执行
      *
      * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId 加微客户ID
+     * @param wxClientId         加微客户ID
      * @return 是否可以执行
      */
     public static boolean tryMarkAsExecuted(String workflowInstanceId, Long wxClientId) {
@@ -244,7 +258,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * 清除超时检测 Key
      *
      * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId 加微客户ID
+     * @param wxClientId         加微客户ID
      */
     public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
         String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
@@ -265,4 +279,38 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         }
         return String.format(DELAY_ADD_WX_KEY, nowDay.getHours(), nowDay.getMinutes());
     }
+
+    /**
+     * 完成加微动作
+     *
+     * @param workflowInstanceId
+     */
+    public void doneAddwx(String workflowInstanceId) {
+        ExecutionContext context = createExecutionContext(workflowInstanceId, nodeKey);
+        context.setVariable("lastNodeKey", nodeKey);
+        //启动定时节点倒计时
+        CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
+        if (!exec.getCurrentNodeKey().equals(nodeKey)) {
+            //当前节点已流转
+            log.error("当前节点已流转 ,目标:{},实际:{}", nodeKey, exec.getCurrentNodeKey());
+            return;
+        }
+        List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
+        edges.forEach(edge -> {
+            List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+            if (null == conditions || conditions.isEmpty()) {
+                super.runNextNode(context, edge);
+            } else {
+                AiCallWorkflowConditionVo condition = conditions.get(0);
+                //节点包含延时条件
+                if (null != condition.getAddTime() && !condition.isAdd()) {
+                    long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
+                    String redisKey = getDelayAddWxKeyPrefix(l) + workflowInstanceId;
+                    ExecutionContext nextContext = context.clone();
+                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+                    super.redisCache.setCacheObject(redisKey, nextContext);
+                }
+            }
+        });
+    }
 }

+ 33 - 28
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -56,36 +56,41 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
         edges.forEach(edge -> {
             List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
 //            Boolean isValid = true;
-            //暂时考虑单条件
-            AiCallWorkflowConditionVo condition = conditions.get(0);
-            //拨通
-            if (condition.isCallConnected() && callRes.getCallTime() != null && callRes.getCallTime() > 0) {
-                //如果含有意向度过滤
-                if (null != condition.getIntention()) {
-                    if (condition.getIntention().equals(callRes.getIntention())) {
+            if (null == conditions || conditions.isEmpty()) {
+                super.runNextNode(context, edge);
+            } else {
+                //暂时考虑单条件
+                AiCallWorkflowConditionVo condition = conditions.get(0);
+                //拨通
+                if (condition.isCallConnected() && callRes.getCallTime() != null && callRes.getCallTime() > 0) {
+                    //如果含有意向度过滤
+                    if (null != condition.getIntention()) {
+                        if (condition.getIntention().equals(callRes.getIntention())) {
+                            super.runNextNode(context, edge);
+                        }
+                    } else {
                         super.runNextNode(context, edge);
                     }
-                } else {
-                    super.runNextNode(context, edge);
                 }
-            }
-            //未拨通
-            else if (!condition.isCallConnected() && (callRes.getCallTime() == null || Long.valueOf(0).equals(callRes.getCallTime()) || callRes.getCallAnswerTime() == null)) {
-                //延时操作
-                if (null != condition.getCallTime()) {
-                    //计算延时分片分钟
-                    long l = System.currentTimeMillis() + condition.getCallTime() * 60 * 1000;
-                    ExecutionContext nextContext = context.clone();
-                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
-                    //添加到延时扫描redis
-                    super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext,1, TimeUnit.DAYS);
-                    super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
-                }
-                //无时间驱动
-                else {
-                    super.runNextNode(context, edge);
+                //未拨通
+                else if (!condition.isCallConnected() && (callRes.getCallTime() == null || Long.valueOf(0).equals(callRes.getCallTime()) || callRes.getCallAnswerTime() == null)) {
+                    //延时操作
+                    if (null != condition.getCallTime()) {
+                        //计算延时分片分钟
+                        long l = System.currentTimeMillis() + condition.getCallTime() * 60 * 1000;
+                        ExecutionContext nextContext = context.clone();
+                        nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+                        //添加到延时扫描redis
+                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
+                        super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
+                    }
+                    //无时间驱动
+                    else {
+                        super.runNextNode(context, edge);
+                    }
                 }
             }
+
         });
         return null;
     }
@@ -108,7 +113,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                     .nextNodeKey("").build();
         } else {
             return ExecutionResult.failure()
-                    .nextNodeKey("").build();
+                    .nextNodeKey(null).build();
         }
 
     }
@@ -138,11 +143,11 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
     }
 
     @Override
-    protected void postExecute(ExecutionContext context, ExecutionResult result){
+    protected void postExecute(ExecutionContext context, ExecutionResult result) {
         super.postExecute(context, result);
         String callRedisKey = context.getVariable("callRedisKey", String.class);
         //来源于定时调用doexec,调用后移除key
-        if(StringUtils.isNotBlank(callRedisKey)){
+        if (StringUtils.isNotBlank(callRedisKey)) {
             super.redisCache.deleteObject(callRedisKey);
         }
 

+ 85 - 0
fs-service/src/main/java/com/fs/company/vo/CompanyWxClient4WorkFlowVO.java

@@ -0,0 +1,85 @@
+package com.fs.company.vo;
+
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fs.common.annotation.Excel;
+import com.fs.common.core.domain.BaseEntityTow;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author MixLiu
+ * @date 2026/2/5 14:35
+ * @description
+ */
+@Data
+public class CompanyWxClient4WorkFlowVO extends BaseEntityTow {
+
+    private static final long serialVersionUID = 1L;
+
+    /** 任务ID */
+    @Excel(name = "任务ID")
+    private Long roboticId;
+    /** 分配企微账号ID */
+    @Excel(name = "分配企微账号ID")
+    private Long roboticWxId;
+    /** 客户ID */
+    @Excel(name = "客户ID")
+    private Long customerId;
+    /** 话术ID */
+    @Excel(name = "话术ID")
+    private Long dialogId;
+
+    /** 昵称 */
+    @Excel(name = "昵称")
+    private String nickName;
+
+    /** 头像 */
+    @Excel(name = "头像")
+    private String avatar;
+
+    /** 手机号 */
+    @Excel(name = "手机号")
+    private String phone;
+
+    /** 微信号 */
+    @Excel(name = "微信号")
+    private String wxNo;
+
+    /** 微信号 */
+    @Excel(name = "微信号")
+    private String wxName;
+
+    /** 客户意向 */
+    @Excel(name = "客户意向")
+    private String intention;
+
+    /** 是否添加;0否1是 */
+    @Excel(name = "是否添加;0否1是2待添加3作废")
+    private Integer isAdd;
+    private Long accountId;
+
+    /** 添加时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @Excel(name = "添加时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime addTime;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @Excel(name = "添加完成时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime successAddTime;
+    private String wxV3;
+    private String wxV4;
+    private Long companyId;
+    private Long companyUserId;
+
+    private String wxNickName;
+    private String dialogName;
+    private String templateDetails;
+    private String template;
+    private String roboticName;
+    private String memo;
+    private String workflowInstanceId;
+    private String currentNodeKey;
+    private String currentNodeName;
+    private Integer currentNodeType;
+}

+ 2 - 2
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -160,9 +160,9 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         </if>
         group by account_id
     </select>
-    <select id="getAddWxList4Workflow" resultType="com.fs.company.domain.CompanyWxClient">
+    <select id="getAddWxList4Workflow" resultType="com.fs.company.vo.CompanyWxClient4WorkFlowVO">
 
-        SELECT t1.* FROM company_wx_client t1
+        SELECT t1.*,t3.workflow_instance_id,t3.current_node_key,t3.current_node_name,t3.current_node_type FROM company_wx_client t1
                              inner join company_voice_robotic_business t2 on t1.id = t2.wx_client_id and t1.robotic_id = t2.robotic_id
                              inner join company_ai_workflow_exec t3 on t3.business_key = t2.id
         where t1.is_add = 0 and t1.account_id is not null

+ 10 - 0
fs-wx-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -51,6 +51,16 @@ public class CommonController {
         taskService.callNextTask();
     }
 
+    @GetMapping("addWx4Workflow")
+    public void addWx4Workflow(Long accountId) {
+        taskService.addWx4Workflow(Collections.singletonList(accountId));
+    }
+
+    @GetMapping("cidWorkflowAddWxRun")
+    public void cidWorkflowAddWxRun() {
+        taskService.cidWorkflowAddWxRun();
+    }
+
 
 
 

+ 53 - 56
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -8,20 +8,19 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.fs.common.constant.Constants;
 import com.fs.common.constant.FsConstants;
+import com.fs.common.core.redis.RedisCache;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.PubFun;
 import com.fs.common.utils.StringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.*;
 import com.fs.company.param.ExecutionContext;
-import com.fs.company.service.ICompanyVoiceRoboticService;
-import com.fs.company.service.ICompanyWxAccountService;
-import com.fs.company.service.ICompanyWxClientService;
-import com.fs.company.service.ICompanyWxDialogService;
+import com.fs.company.service.*;
 import com.fs.company.service.impl.*;
-import com.fs.company.service.CompanyWorkflowEngine;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
 import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.company.service.impl.call.node.WorkflowNodeFactory;
+import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import com.fs.course.config.RedisKeyScanner;
 import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
@@ -37,6 +36,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
+import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
@@ -76,6 +76,8 @@ public class WxTaskService {
     private final CompanyAiWorkflowExecMapper companyAiWorkflowExecMapper;
     private final CompanyWorkflowEngine companyWorkflowEngine;
     private final CompanyVoiceRoboticBusinessMapper companyVoiceRoboticBusinessMapper;
+    private final WorkflowNodeFactory workflowNodeFactory;
+    private final RedisCache redisCache2;
     private final ExecutorService cidExcutor = new ThreadPoolExecutor(
             32,
             64,
@@ -228,11 +230,11 @@ public class WxTaskService {
         String json = sysConfigService.selectConfigByKey("wx.config");
         WxConfig config = JSONUtil.toBean(json, WxConfig.class);
         // 需要添加微信的列表
-        List<CompanyWxClient> list = companyWxClientService.getAddWxList4Workflow(accountIdList);
+        List<CompanyWxClient4WorkFlowVO> list = companyWxClientService.getAddWxList4Workflow(accountIdList);
         log.info("需要添加微信的数量:{}", list.size());
         if (list.isEmpty()) return;
         List<CompanyWxClient> addList = new ArrayList<>();
-        Map<Long, CompanyWxClient> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient::getAccountId);
+        Map<Long, CompanyWxClient4WorkFlowVO> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient4WorkFlowVO::getAccountId);
         List<CompanyWxAccount> accountList = new ArrayList<>(companyWxAccountService.listByIds(clientMap.keySet()));
         log.info("查询加微的账号数量:{}", list.size());
         List<CompanyWxAccount> addAccountList = accountList.stream().filter(e -> {
@@ -252,11 +254,8 @@ public class WxTaskService {
         }).collect(Collectors.toList());
         log.info("实际加微的账号数量:{}", addAccountList.size());
         addAccountList.forEach(e -> {
-            CompanyWxClient client = clientMap.get(e.getId());
+            CompanyWxClient4WorkFlowVO client = clientMap.get(e.getId());
             if (client != null) {
-                String task = redisCache.getCacheObject(Constants.TASK_ID + client.getRoboticId());
-                log.info("ROBOTIC-ID:{},CLIENT-ID:{},当前任务执行状态:{}", client.getRoboticId(), client.getId(), task);
-                if (StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)) {
                     CompanyWxDialog dialog = companyWxDialogService.getById(client.getDialogId());
                     CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
                     String newTxt = objectPlaceholderResolver.resolvePlaceholders(crmCustomer, dialog.getTemplateDetails());
@@ -276,7 +275,9 @@ public class WxTaskService {
                         client.setAddTime(LocalDateTime.now());
                         client.setWxV3(vo.getV3());
                         client.setWxV4(vo.getV4());
-                        addList.add(client);
+                        CompanyWxClient addItem = new CompanyWxClient();
+                        BeanUtils.copyProperties(client, addItem);
+                        addList.add(addItem);
                         addLog.setStatus(2);
                         addLog.setResult(JSON.toJSONString(vo));
                     } else {
@@ -285,62 +286,30 @@ public class WxTaskService {
                         addLog.setResult(JSON.toJSONString(vo));
                     }
                     asyncSaveCompanyVoiceRoboticCallLog(addLog);
-                } else {
-                    log.error("ROBOTIC-ID:{},当前任务没有执行加微任务", client.getRoboticId());
-                }
             } else {
                 log.error("当前账号暂无需要添加微信:{}-{}", e.getId(), e.getWxNickName());
             }
         });
         if (!addList.isEmpty()) {
             companyWxClientService.updateBatchById(addList);
-            //根据加微成功的用户,判定是否加入延时执行下一步任务
-            Set<Long> roboticIdSet = addList.stream().map(CompanyWxClient::getRoboticId).collect(Collectors.toSet());
-            Set<Long> userIdSet = addList.stream().map(CompanyWxClient::getCustomerId).collect(Collectors.toSet());
-
-            //找到任务
-            List<CompanyVoiceRobotic> companyVoiceRobotics = companyVoiceRoboticMapper.selectBatchIds(roboticIdSet);
-            Map<Long, CompanyVoiceRobotic> roboticsMp = companyVoiceRobotics.stream().collect(Collectors.toMap(CompanyVoiceRobotic::getId, Function.identity(), (existing, replacement) -> existing));
-            //找到callees数据
-            List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet, roboticIdSet);
-            Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e -> e.getUserId() + "-" + e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
-
             long l = System.currentTimeMillis();
-
             //根据加微成功
             for (CompanyWxClient client : addList) {
-                CompanyVoiceRobotic clientRobotic = roboticsMp.getOrDefault(client.getRoboticId(), null);
-                if (null == clientRobotic) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
-                    continue;
-                }
-                CompanyVoiceRoboticCallees callees = calleesMp.getOrDefault(client.getCustomerId() + "-" + client.getRoboticId(), null);
-                if (null == callees) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
-                    continue;
-                }
-                Integer addWxTime = clientRobotic.getAddWxTime();
-                if (null == addWxTime) {
-                    log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有设置加微后置等待时间", client.getRoboticId(), client.getId());
-                } else {
-                    long endT = System.currentTimeMillis() + addWxTime * 60 * 1000;
-                    StringBuilder sb = new StringBuilder(Constants.CID_NEXT_TASK_ID).append(callees.getRoboticId()).append(":").append(callees.getId());
-                    redisCache.setCacheObject(sb.toString(), String.valueOf(endT));
+                CompanyWxClient4WorkFlowVO vo = clientMap.get(client.getAccountId());
+                IWorkflowNode node = workflowNodeFactory.createNode(vo.getCurrentNodeKey(),
+                        NodeTypeEnum.fromValue(vo.getCurrentNodeType()),
+                        vo.getCurrentNodeName(), null);
+                if (node instanceof AiAddWxTaskNode) {
+                    CompletableFuture.runAsync(() -> {
+                        AiAddWxTaskNode addWxNode = (AiAddWxTaskNode) node;
+                        addWxNode.doneAddwx(vo.getWorkflowInstanceId());
+                    }, cidExcutor);
                 }
             }
-            companyVoiceRoboticCallees.forEach(robotic ->
-                    robotic.setRunTaskFlow(
-                            StringUtils.isBlank(robotic.getRunTaskFlow()) ?
-                                    Constants.ADD_WX : robotic.getRunTaskFlow() + "," + Constants.ADD_WX
-                    )
-            );
-            companyVoiceRoboticCalleesServiceImpl.updateBatchById(companyVoiceRoboticCallees);
-            companyVoiceRoboticServiceImpl.finishAddWxByCallees(roboticIdSet);
-        }
-        if (!addAccountList.isEmpty()) {
-            companyWxAccountService.updateBatchById(addAccountList);
+            if (!addAccountList.isEmpty()) {
+                companyWxAccountService.updateBatchById(addAccountList);
+            }
         }
-
     }
 
     public void initAccountNum() {
@@ -826,5 +795,33 @@ public class WxTaskService {
             }
         }
     }
+    /**
+     * 扫描工作流延时任务
+     */
+    public void cidWorkflowAddWxRun() {
+        log.info("===========工作流延时任务开始扫描===========");
+        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(null) + "*";
+        Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
+        log.info("共扫描到 {} 个待处理键", keys.size());
+        keys.parallelStream().forEach(key -> {
+            try {
+                //doExec
+                CompletableFuture.runAsync(()->{
+                    try {
+                        ExecutionContext context = redisCache2.getCacheObject(key);
+                        context.setVariable("callRedisKey",key);
+                        context.setVariable("callSource","addWxTimer");
+                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                    } catch (Exception e) {
+                        log.error("处理工作流延时任务异常 - key: {}", key, e);
+                    }
+                }, cidExcutor);
+
+            } catch (Exception ex) {
+                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+            }
+        });
+        log.info("===========工作流延时任务扫描结束===========");
+    }
 
 }