Ver código fonte

优化逻辑

吴树波 3 semanas atrás
pai
commit
6d5abca739

+ 2 - 2
fs-service/src/main/java/com/fs/company/service/CompanyWorkflowEngine.java

@@ -25,7 +25,7 @@ public interface CompanyWorkflowEngine {
      * @param nodeId 节点ID
      * @return 执行结果
      */
-    ExecutionResult executeNode(String workflowInstanceId, String nodeId);
+    void executeNode(String workflowInstanceId, String nodeId);
 
     /**
      * 暂停工作流
@@ -53,5 +53,5 @@ public interface CompanyWorkflowEngine {
      * @param inputData 输入数据
      * @return 恢复结果
      */
-    ExecutionResult resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData);
+    void resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData);
 }

+ 1 - 1
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -722,8 +722,8 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             map.put("businesse", e.getId());
             // 写入执行任务
             ExecutionResult initialize = companyWorkflowEngine.initialize(robotic.getCompanyAiWorkflowId(), map);
-            redisExcution.setCacheObject(NODE_RUN_KEY + robotic.getId() + ":" + e.getId(), initialize);
             companyWorkflowEngine.executeNode(initialize.getWorkflowInstanceId(), initialize.getNextNodeKey());
+
         });
 
     }

+ 40 - 63
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -3,6 +3,7 @@ package com.fs.company.service.impl;
 import com.alibaba.fastjson.JSON;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fs.common.exception.CustomException;
 import com.fs.common.utils.StringUtils;
 import com.fs.company.domain.CompanyAiWorkflowExec;
 import com.fs.company.domain.CompanyAiWorkflowExecLog;
@@ -94,15 +95,16 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
      */
     @Override
     @Transactional
-    public ExecutionResult executeNode(String workflowInstanceId, String nodeKey) {
+    public void executeNode(String workflowInstanceId, String nodeKey) {
         try {
             // 加载当前执行记录
             CompanyAiWorkflowExec currentExec =
                     currentExecutionMapper.selectById(workflowInstanceId);
 
             if (currentExec == null) {
-                return ExecutionResult.failure()
-                        .errorMessage("工作流实例不存在: " + workflowInstanceId).build();
+//                return ExecutionResult.failure()
+//                        .errorMessage("工作流实例不存在: " + workflowInstanceId).build();
+                throw new CustomException("工作流实例不存在: " + workflowInstanceId);
             }
 
             // 反序列化执行上下文
@@ -115,67 +117,47 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             // 创建节点实例并执行
             IWorkflowNode node = createNode(definition, nodeKey);
             if (node == null) {
-                return ExecutionResult.failure()
-                        .errorMessage("节点不存在: " + nodeKey).build();
+                throw new CustomException("节点不存在: " + nodeKey);
             }
 
             // 更新当前节点到执行上下文
             context.setCurrentNodeKey(nodeKey);
+            ExecutionResult result = node.execute(context);
 
-            if (node.isAsync()) {
-                // 阻塞节点:执行节点后暂停流程,等待外部事件唤醒
-                ExecutionResult result = node.execute(context);
-
-//                // 记录执行日志
-//                logExecution(createLogEntry(workflowInstanceId, nodeKey, node.getType(), result));
+//            if (node.isAsync()) {
+//                // 阻塞节点:执行节点后暂停流程,等待外部事件唤醒
+//
+//                // 返回特殊结果表示流程被阻塞
+//                return ExecutionResult.paused()
+//                        .nextNodeKey(result.getNextNodeKey())
+//                        .outputData(result.getOutputData()).build();
+//            } else {
+//                // 执行节点
+//                ExecutionResult result = node.execute(context);
 //
-//                // 如果执行失败,更新状态为失败
-//                if (!result.isSuccess()) {
+//                // 如果执行成功,返回下一个节点ID
+//                if (result.isSuccess()) {
+//                    // 检查是否是结束节点
+//                    if (StringUtils.isBlank(result.getNextNodeKey())) {
+//                        // 结束工作流
+//                        completeWorkflow(workflowInstanceId);
+//                    } else {
+//                        // 更新当前节点为下一个节点
+//                        updateCurrentNode(workflowInstanceId, result.getNextNodeKey());
+//                    }
+//                } else {
+//                    // 执行失败时更新状态
 //                    updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-//                    return result;
 //                }
-//
-//                // 阻塞节点执行成功后,将工作流状态设为暂停
-//                pauseWorkflowForBlockingNode(workflowInstanceId, nodeKey, context);
-
-                // 返回特殊结果表示流程被阻塞
-                return ExecutionResult.paused()
-                        .nextNodeKey(result.getNextNodeKey())
-                        .outputData(result.getOutputData()).build();
-            } else {
-                // 执行节点
-                ExecutionResult result = node.execute(context);
-
-                // 记录执行日志
-//                logExecution(createLogEntry(workflowInstanceId, nodeKey, node.getType(), result));
-//
-//                // 更新当前执行记录
-//                updateCurrentExecution(workflowInstanceId, nodeKey, context);
-
-                // 如果执行成功,返回下一个节点ID
-                if (result.isSuccess()) {
-                    // 检查是否是结束节点
-                    if (StringUtils.isBlank(result.getNextNodeKey())) {
-                        // 结束工作流
-                        completeWorkflow(workflowInstanceId);
-                    } else {
-                        // 更新当前节点为下一个节点
-                        updateCurrentNode(workflowInstanceId, result.getNextNodeKey());
-                    }
-                } else {
-                    // 执行失败时更新状态
-                    updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-                }
-
-                return result;
-            }
-
 
+//                return result;
+//            }
         } catch (Exception e) {
             log.error("节点执行失败: {} -> {}", workflowInstanceId, nodeKey, e);
             updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-            return ExecutionResult.failure()
-                    .errorMessage("Node execution failed: " + e.getMessage()).build();
+            throw new CustomException("节点执行失败");
+//            return ExecutionResult.failure()
+//                    .errorMessage("Node execution failed: " + e.getMessage()).build();
         }
     }
 
@@ -445,20 +427,18 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
     }
     @Override
     @Transactional
-    public ExecutionResult resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData) {
+    public void resumeFromBlockingNode(String workflowInstanceId, String nodeKey, Map<String, Object> inputData) {
         try {
             // 加载当前执行记录
             CompanyAiWorkflowExec currentExec = currentExecutionMapper.selectByWorkflowInstanceIdAndCurrentNode(workflowInstanceId,nodeKey);
 
             if (currentExec == null) {
-                return ExecutionResult.failure()
-                        .errorMessage("工作流实例不存在: " + workflowInstanceId).build();
+                throw new CustomException("工作流实例不存在: " + workflowInstanceId);
             }
 
             // 检查当前工作流是否处于暂停状态
             if (!Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) {
-                return ExecutionResult.failure()
-                        .errorMessage("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId).build();
+                throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
             }
 
             // 反序列化执行上下文并合并新的输入数据
@@ -473,8 +453,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             // 创建节点实例
             IWorkflowNode node = createNode(definition, nodeKey);
             if (node == null) {
-                return ExecutionResult.failure()
-                        .errorMessage("节点不存在: " + nodeKey).build();
+                throw new CustomException("节点不存在: " + nodeKey);
             }
 
             // 继续执行节点逻辑
@@ -496,18 +475,16 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
                     completeWorkflow(workflowInstanceId);
                 } else {
                     // 执行下一个节点
-                    return executeNode(workflowInstanceId, result.getNextNodeKey());
+                    executeNode(workflowInstanceId, result.getNextNodeKey());
                 }
             } else {
                 // 执行失败,更新状态
                 updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
             }
-            return result;
         } catch (Exception e) {
             log.error("唤醒阻塞节点失败: {} -> {}", workflowInstanceId, nodeKey, e);
             updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
-            return ExecutionResult.failure()
-                    .errorMessage("Resume from blocking node failed: " + e.getMessage()).build();
+            throw new CustomException("Resume from blocking node failed: " + e.getMessage());
         }
     }
 }