|
|
@@ -2,7 +2,6 @@ package com.fs.company.service.impl.call.node;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-import com.fs.common.core.domain.R;
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.exception.CustomException;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
@@ -17,12 +16,12 @@ import com.fs.enums.NodeTypeEnum;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.redisson.api.RLock;
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
|
|
|
-import javax.xml.soap.Node;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.LocalTime;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
|
@@ -117,6 +116,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
|
|
|
if (timeAvailable == null) {
|
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
+ if (exec == null) {
|
|
|
+ log.warn("continueExecute: 工作流执行实例不存在 - workflowInstanceId: {}", context.getWorkflowInstanceId());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
Integer cidGroupNo = exec.getCidGroupNo();
|
|
|
LocalTime runtimeRangeStart = exec.getRuntimeRangeStart();
|
|
|
String continueTimerExecuteKey = getContinueTimerExecuteKey(cidGroupNo, runtimeRangeStart);
|
|
|
@@ -235,17 +238,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
long endTime = System.currentTimeMillis();
|
|
|
context.setVariable("node_end_time_" + nodeKey, endTime);
|
|
|
log.info("Completed execution of node: {} ({})", nodeKey, nodeName);
|
|
|
- //不在这里控制流程状态容易出问题bug,各节点自行管理状态
|
|
|
-// int logStatus;
|
|
|
-// if (ExecutionStatusEnum.SUCCESS.equals(result.getStatus())) {
|
|
|
-// logStatus = ExecutionStatusEnum.SUCCESS.getValue();
|
|
|
-// updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.SUCCESS);
|
|
|
-// } else if (ExecutionStatusEnum.FAILURE.equals(result.getStatus())) {
|
|
|
-// logStatus = ExecutionStatusEnum.FAILURE.getValue();
|
|
|
-// updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.FAILURE);
|
|
|
-// } else {
|
|
|
-//
|
|
|
-// }
|
|
|
+ if (result == null) {
|
|
|
+ log.warn("节点 {} 执行结果为 null,跳过记录执行日志", nodeKey);
|
|
|
+ return;
|
|
|
+ }
|
|
|
int logStatus = result.getStatus().getValue();
|
|
|
CompanyAiWorkflowExecLog logEntry = createLogEntry(context.getWorkflowInstanceId(), nodeKey, getType(), result, context);
|
|
|
logEntry.setStatus(logStatus);
|
|
|
@@ -280,7 +276,13 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
*/
|
|
|
public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
|
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
|
|
|
+ if (exec == null) {
|
|
|
+ throw new RuntimeException("工作流执行实例不存在: " + workflowInstanceId);
|
|
|
+ }
|
|
|
List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
|
|
|
+ if (edges == null || edges.isEmpty()) {
|
|
|
+ throw new RuntimeException("未找到节点 " + nodeKey + " 的出边");
|
|
|
+ }
|
|
|
return edges.get(0).getTargetNodeKey();
|
|
|
}
|
|
|
|
|
|
@@ -439,7 +441,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
update.setWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
update.setCurrentNodeKey(context.getCurrentNodeKey());
|
|
|
if (null != context.getVariables() && null != context.getVariable("nodeName", String.class)) {
|
|
|
- update.setCurrentNodeName(context.getVariable("nodeName0", String.class));
|
|
|
+ update.setCurrentNodeName(context.getVariable("nodeName", String.class));
|
|
|
}
|
|
|
update.setStatus(ExecutionStatusEnum.RUNNING.getValue());
|
|
|
update.setLastUpdateTime(LocalDateTime.now());
|
|
|
@@ -452,11 +454,6 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
}
|
|
|
|
|
|
protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
|
|
|
- try {
|
|
|
- Thread.sleep(2000L);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
if (StringUtils.isBlank(edge.getTargetNodeKey())) {
|
|
|
return;
|
|
|
}
|
|
|
@@ -464,10 +461,35 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
CompanyWorkflowNode nextNode = getNodeByKey(edge.getTargetNodeKey());
|
|
|
nextContext.setCurrentNodeKey(nextNode.getNodeKey());
|
|
|
nextContext.setVariable("nodeName", nextNode.getNodeName());
|
|
|
- log.info("开始执行下一个节点:{}", nextNode.getNodeName());
|
|
|
- execPointNextNode(nextContext);
|
|
|
IWorkflowNode node = workflowNodeFactory.createNode(nextNode.getNodeKey(), NodeTypeEnum.fromCode(nextNode.getNodeType()), nextNode.getNodeName(), null);
|
|
|
- node.execute(nextContext);
|
|
|
+
|
|
|
+ // 异步执行下一节点:当前节点不等待,下一节点在独立线程中执行(含 2 秒延迟)
|
|
|
+ Executor executor = getWorkflowNextNodeExecutor();
|
|
|
+ Runnable nextTask = () -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(2000L);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("runNextNode 异步任务被中断", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("开始执行下一个节点:{}", nextNode.getNodeName());
|
|
|
+ execPointNextNode(nextContext);
|
|
|
+ node.execute(nextContext);
|
|
|
+ };
|
|
|
+ if (executor != null) {
|
|
|
+ CompletableFuture.runAsync(nextTask, executor);
|
|
|
+ } else {
|
|
|
+ CompletableFuture.runAsync(nextTask);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Executor getWorkflowNextNodeExecutor() {
|
|
|
+ try {
|
|
|
+ return SpringUtils.getBean("cidWorkFlowExecutor");
|
|
|
+ } catch (Exception e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -477,6 +499,9 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
*/
|
|
|
protected Boolean runnable(ExecutionContext context) {
|
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
+ if (exec == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
return !exec.getCurrentNodeKey().equals(exec.getEndNodeKey());
|
|
|
}
|
|
|
|