吴树波 3 settimane fa
parent
commit
f66b428b1b

+ 37 - 70
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -7,9 +7,12 @@ import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.spring.SpringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.CompanyWxClientMapper;
+import com.fs.company.mapper.CompanyWorkflowNodeMapper;
 import com.fs.company.param.ExecutionContext;
+import com.fs.company.service.IWorkflowNode;
 import com.fs.company.vo.AiAddWxWorkflowConditionVo;
 import com.fs.company.vo.ExecutionResult;
+import com.fs.enums.ExecutionStatusEnum;
 import com.fs.enums.NodeTypeEnum;
 import lombok.extern.slf4j.Slf4j;
 
@@ -26,6 +29,7 @@ import java.util.Map;
 public class AiAddWxTaskNode extends AbstractWorkflowNode {
 
     private static final CompanyWxClientMapper companyWxClientMapper = SpringUtils.getBean(CompanyWxClientMapper.class);
+    private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     @SuppressWarnings("unchecked")
     private static final RedisCacheT<String> redisCache = SpringUtils.getBean(RedisCacheT.class);
 
@@ -47,7 +51,6 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
     @Override
     protected ExecutionResult doContinue(ExecutionContext context) {
         CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
-
         List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
 
         // 获取业务数据
@@ -56,41 +59,37 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         CompanyWxClient wxClient = companyWxClientMapper.selectById(business.getWxClientId());
 
         // 根据加微结果判断走哪条边
-        String nextNodeKey = null;
         for (CompanyWorkflowEdge edge : edges) {
             if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
-                // 无条件边,直接使用
-                nextNodeKey = edge.getTargetNodeKey();
-                break;
+                // 无条件边,直接执行
+                this.runNextNode(context, edge);
+                return null;
             }
             AiAddWxWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiAddWxWorkflowConditionVo.class);
             // 判断加微是否成功 (isAdd: 0否 1是 2待添加 3作废)
             boolean addSuccess = wxClient != null && Integer.valueOf(1).equals(wxClient.getIsAdd());
             if (condition.isAddSuccess() == addSuccess) {
-                nextNodeKey = edge.getTargetNodeKey();
-                break;
+                this.runNextNode(context, edge);
+                return null;
             }
         }
 
-        if (nextNodeKey != null) {
-            return ExecutionResult.success().nextNodeKey(nextNodeKey).build();
-        }
-        return ExecutionResult.failure().errorMessage("未找到满足条件的下一节点").build();
+        log.error("未找到满足条件的下一节点 - workflowInstanceId: {}", context.getWorkflowInstanceId());
+        return null;
     }
 
     /**
      * 执行加微节点逻辑(只准备数据,实际加微由定时任务执行)
-     * 1. 通过 workflowInstanceId 找到 CompanyVoiceRoboticBusiness 业务数据
-     * 2. 验证 CompanyWxClient 数据是否已准备好(accountId、dialogId、isAdd=0)
-     * 3. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
-     * 4. 设置超时时间到 Redis
-     * 5. 返回等待状态,等待定时任务执行加微后的回调
      *
      * @param context 执行上下文
      * @return 执行结果
      */
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
+        if (!isAsync()) {
+            return ExecutionResult.failure().nextNodeKey("").build();
+        }
+
         try {
             // 1. 获取业务数据
             CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
@@ -121,14 +120,9 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
             // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
             if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
                 log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
-                // 如果已经是待添加(2)或已添加(1),说明已经在处理中或已完成
                 if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
                     return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
                 }
-                if (Integer.valueOf(2).equals(wxClient.getIsAdd())) {
-                    // 已经是待添加状态,继续等待
-                    log.info("加微客户记录已是待添加状态,继续等待 - wxClientId: {}", wxClientId);
-                }
             }
 
             Long roboticId = business.getRoboticId();
@@ -136,7 +130,6 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
                     context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
 
             // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
-            // 定时任务 WxTaskService.addWx() 会检查这个状态
             String taskKey = Constants.TASK_ID + roboticId;
             redisCache.setCacheObject(taskKey, Constants.ADD_WX);
             log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
@@ -149,18 +142,12 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
             log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
                     timeoutKey, timeoutMinutes, timeoutTimestamp);
 
-            // 7. 保存执行结果到输出数据
-            Map<String, Object> outputData = new HashMap<>();
-            outputData.put("wxClientId", wxClientId);
-            outputData.put("accountId", accountId);
-            outputData.put("roboticId", roboticId);
-            outputData.put("preparedForAddWx", true);
+            // 7. 设置工作流为暂停状态,等待加微回调
+            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
+
+            // 8. 返回 paused 状态,nextNodeKey 为空字符串(不自动流转)
+            return ExecutionResult.paused().nextNodeKey("").build();
 
-            // 8. 异步节点返回等待状态,等待定时任务执行加微后的回调
-            return ExecutionResult.paused()
-                    .outputData(outputData)
-                    .nextNodeKey(getNextNodeKey(context.getWorkflowInstanceId(), nodeKey))
-                    .build();
         } catch (Exception e) {
             log.error("准备加微任务数据异常 - workflowInstanceId: {}", context.getWorkflowInstanceId(), e);
             return ExecutionResult.failure().errorMessage("准备加微任务数据异常: " + e.getMessage()).build();
@@ -177,43 +164,23 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         return true;
     }
 
-    @Override
-    public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
-        CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
-
-        List<CompanyWorkflowEdge> companyWorkflowEdges =
-                companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(companyAiWorkflowExec.getWorkflowId(), nodeKey);
-
-        // 多条边时需要根据条件判断
-        if (companyWorkflowEdges != null && !companyWorkflowEdges.isEmpty()) {
-            if (companyWorkflowEdges.size() > 1) {
-                // 存在多条件时,找默认边或第一条边
-                for (CompanyWorkflowEdge edge : companyWorkflowEdges) {
-                    if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
-                        return edge.getTargetNodeKey();
-                    }
-                }
-                // 如果没有无条件边,返回第一条
-                return companyWorkflowEdges.get(0).getTargetNodeKey();
-            } else {
-                return companyWorkflowEdges.get(0).getTargetNodeKey();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public Boolean edgeConditionValidate(String condition) {
-        if (condition == null || condition.isEmpty()) {
-            return true;
-        }
-        try {
-            AiAddWxWorkflowConditionVo conditionVo = JSON.parseObject(condition, AiAddWxWorkflowConditionVo.class);
-            return conditionVo != null;
-        } catch (Exception e) {
-            log.error("解析加微条件失败: {}", condition, e);
-            return false;
-        }
+    /**
+     * 运行下一个节点
+     * @param context 执行上下文
+     * @param edge 边
+     */
+    private void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
+        ExecutionContext nextContext = context.clone();
+        nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+        super.execPointNextNode(nextContext);
+        CompanyWorkflowNode nextNode = companyWorkflowNodeMapper.selectNodeByNodeKey(edge.getTargetNodeKey());
+        IWorkflowNode node = super.workflowNodeFactory.createNode(
+                nextNode.getNodeKey(),
+                NodeTypeEnum.fromCode(nextNode.getNodeType()),
+                nextNode.getNodeName(),
+                null
+        );
+        node.execute(nextContext);
     }
 
     /**