|
@@ -2,7 +2,6 @@ package com.fs.company.service.impl.call.node;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-import com.fs.common.core.domain.R;
|
|
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.exception.CustomException;
|
|
import com.fs.common.exception.CustomException;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
import com.fs.common.utils.StringUtils;
|
|
@@ -17,11 +16,12 @@ import com.fs.enums.NodeTypeEnum;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.redisson.api.RLock;
|
|
import org.redisson.api.RLock;
|
|
|
import org.redisson.api.RedissonClient;
|
|
import org.redisson.api.RedissonClient;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
|
|
|
|
|
|
|
-import javax.xml.soap.Node;
|
|
|
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
|
|
|
+import java.time.LocalTime;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
|
|
+import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -45,6 +45,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
public static final ObjectMapper objectMapper = new ObjectMapper();
|
|
public static final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
public static final RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
|
|
public static final RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
|
|
|
protected static final String NODE_EXEC_LOCK_PREFIX = "node_exec_lock_";
|
|
protected static final String NODE_EXEC_LOCK_PREFIX = "node_exec_lock_";
|
|
|
|
|
+ protected static final String CONTINUE_TIMER_EXECUTE_KEY = "CONTINUE:TIMER:EXECUTE:%s:%s:%s:";
|
|
|
|
|
+ public static final String CONTINUE_TIMER_EXECUTE_KEY_PREFIX = "CONTINUE:TIMER:EXECUTE:%s:*";
|
|
|
|
|
|
|
|
protected String nodeKey;
|
|
protected String nodeKey;
|
|
|
protected String nodeName;
|
|
protected String nodeName;
|
|
@@ -62,6 +64,13 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
log.info("当前流程已到达结束节点,节点执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
log.info("当前流程已到达结束节点,节点执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
|
|
+ // 校验节点执行是否符合时间设置
|
|
|
|
|
+ CompanyAiWorkflowExec timeAvailable = companyAiWorkflowExecMapper.selectExecWithTimeAvailableByInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+ if (timeAvailable == null) {
|
|
|
|
|
+ updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.PENDING);
|
|
|
|
|
+ log.info("当前流程不在可执行时间范围内 已被设置为等待状态,节点执行等待:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
log.info("开始执行节点:" + nodeName + " - " + nodeKey);
|
|
log.info("开始执行节点:" + nodeName + " - " + nodeKey);
|
|
|
// 记录执行开始时间
|
|
// 记录执行开始时间
|
|
|
long startTime = System.currentTimeMillis();
|
|
long startTime = System.currentTimeMillis();
|
|
@@ -78,6 +87,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
}
|
|
}
|
|
|
// 执行前的通用处理
|
|
// 执行前的通用处理
|
|
|
preExecute(context);
|
|
preExecute(context);
|
|
|
|
|
+
|
|
|
// 执行具体的业务逻辑
|
|
// 执行具体的业务逻辑
|
|
|
result = doExecute(context);
|
|
result = doExecute(context);
|
|
|
// 记录执行时间
|
|
// 记录执行时间
|
|
@@ -101,6 +111,26 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
log.info("当前流程已到达结束节点,节点继续执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
log.info("当前流程已到达结束节点,节点继续执行失败:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
|
|
+ // 校验节点执行是否符合时间设置
|
|
|
|
|
+ CompanyAiWorkflowExec timeAvailable = companyAiWorkflowExecMapper.selectExecWithTimeAvailableByInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+
|
|
|
|
|
+ if (timeAvailable == null) {
|
|
|
|
|
+ CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+ if (exec == null) {
|
|
|
|
|
+ log.warn("continueExecute: 工作流执行实例不存在 - workflowInstanceId: {}", context.getWorkflowInstanceId());
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ Integer cidGroupNo = exec.getCidGroupNo();
|
|
|
|
|
+ LocalTime runtimeRangeStart = exec.getRuntimeRangeStart();
|
|
|
|
|
+ String continueTimerExecuteKey = getContinueTimerExecuteKey(cidGroupNo, runtimeRangeStart);
|
|
|
|
|
+ context.setVariable("continue_timer_execute_nodekey", nodeKey);
|
|
|
|
|
+ context.setVariable("continue_timer_execute_nodetype", getType());
|
|
|
|
|
+ context.setVariable("continue_timer_execute_nodename", nodeName);
|
|
|
|
|
+ redisCache.setCacheObject(continueTimerExecuteKey , context, 1, TimeUnit.DAYS);
|
|
|
|
|
+ log.info("当前流程不在可执行时间范围内 continue已被阻塞等待唤醒执行,节点执行等待:- {},- {} -,{}" , nodeName, nodeKey, context.getWorkflowInstanceId());
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
RLock lock = null;
|
|
RLock lock = null;
|
|
|
try {
|
|
try {
|
|
|
String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
|
|
String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
|
|
@@ -187,16 +217,17 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
context.setCurrentNodeKey(nodeKey);
|
|
context.setCurrentNodeKey(nodeKey);
|
|
|
CompanyWorkflowNode node = getNodeByKey(nodeKey);
|
|
CompanyWorkflowNode node = getNodeByKey(nodeKey);
|
|
|
context.setVariable("currentNode", node);
|
|
context.setVariable("currentNode", node);
|
|
|
- CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
- if (!companyAiWorkflowExec.getCurrentNodeKey().equals(nodeKey)) {
|
|
|
|
|
- CompanyAiWorkflowExec update = new CompanyAiWorkflowExec();
|
|
|
|
|
- update.setId(companyAiWorkflowExec.getId());
|
|
|
|
|
- update.setCurrentNodeKey(nodeKey);
|
|
|
|
|
- update.setCurrentNodeName(nodeName);
|
|
|
|
|
- update.setCurrentNodeType(NodeTypeEnum.fromCode(node.getNodeType()).getValue());
|
|
|
|
|
- update.setLastUpdateTime(LocalDateTime.now());
|
|
|
|
|
- companyAiWorkflowExecMapper.updateCompanyAiWorkflowExec(update);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.RUNNING);
|
|
|
|
|
+// CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+// if (!companyAiWorkflowExec.getCurrentNodeKey().equals(nodeKey)) {
|
|
|
|
|
+// CompanyAiWorkflowExec update = new CompanyAiWorkflowExec();
|
|
|
|
|
+// update.setId(companyAiWorkflowExec.getId());
|
|
|
|
|
+// update.setCurrentNodeKey(nodeKey);
|
|
|
|
|
+// update.setCurrentNodeName(nodeName);
|
|
|
|
|
+// update.setCurrentNodeType(NodeTypeEnum.fromCode(node.getNodeType()).getValue());
|
|
|
|
|
+// update.setLastUpdateTime(LocalDateTime.now());
|
|
|
|
|
+// companyAiWorkflowExecMapper.updateCompanyAiWorkflowExec(update);
|
|
|
|
|
+// }
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -207,17 +238,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
long endTime = System.currentTimeMillis();
|
|
long endTime = System.currentTimeMillis();
|
|
|
context.setVariable("node_end_time_" + nodeKey, endTime);
|
|
context.setVariable("node_end_time_" + nodeKey, endTime);
|
|
|
log.info("Completed execution of node: {} ({})", nodeKey, nodeName);
|
|
log.info("Completed execution of node: {} ({})", nodeKey, nodeName);
|
|
|
- //不在这里控制流程状态容易出问题bug,各节点自行管理状态
|
|
|
|
|
-// int logStatus;
|
|
|
|
|
-// if (ExecutionStatusEnum.SUCCESS.equals(result.getStatus())) {
|
|
|
|
|
-// logStatus = ExecutionStatusEnum.SUCCESS.getValue();
|
|
|
|
|
-// updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.SUCCESS);
|
|
|
|
|
-// } else if (ExecutionStatusEnum.FAILURE.equals(result.getStatus())) {
|
|
|
|
|
-// logStatus = ExecutionStatusEnum.FAILURE.getValue();
|
|
|
|
|
-// updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.FAILURE);
|
|
|
|
|
-// } else {
|
|
|
|
|
-//
|
|
|
|
|
-// }
|
|
|
|
|
|
|
+ if (result == null) {
|
|
|
|
|
+ log.warn("节点 {} 执行结果为 null,跳过记录执行日志", nodeKey);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
int logStatus = result.getStatus().getValue();
|
|
int logStatus = result.getStatus().getValue();
|
|
|
CompanyAiWorkflowExecLog logEntry = createLogEntry(context.getWorkflowInstanceId(), nodeKey, getType(), result, context);
|
|
CompanyAiWorkflowExecLog logEntry = createLogEntry(context.getWorkflowInstanceId(), nodeKey, getType(), result, context);
|
|
|
logEntry.setStatus(logStatus);
|
|
logEntry.setStatus(logStatus);
|
|
@@ -252,7 +276,13 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
*/
|
|
*/
|
|
|
public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
|
|
public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
|
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
|
|
|
|
|
+ if (exec == null) {
|
|
|
|
|
+ throw new RuntimeException("工作流执行实例不存在: " + workflowInstanceId);
|
|
|
|
|
+ }
|
|
|
List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
|
|
List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
|
|
|
|
|
+ if (edges == null || edges.isEmpty()) {
|
|
|
|
|
+ throw new RuntimeException("未找到节点 " + nodeKey + " 的出边");
|
|
|
|
|
+ }
|
|
|
return edges.get(0).getTargetNodeKey();
|
|
return edges.get(0).getTargetNodeKey();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -411,7 +441,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
update.setWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
update.setWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
update.setCurrentNodeKey(context.getCurrentNodeKey());
|
|
update.setCurrentNodeKey(context.getCurrentNodeKey());
|
|
|
if (null != context.getVariables() && null != context.getVariable("nodeName", String.class)) {
|
|
if (null != context.getVariables() && null != context.getVariable("nodeName", String.class)) {
|
|
|
- update.setCurrentNodeName(context.getVariable("nodeName0", String.class));
|
|
|
|
|
|
|
+ update.setCurrentNodeName(context.getVariable("nodeName", String.class));
|
|
|
}
|
|
}
|
|
|
update.setStatus(ExecutionStatusEnum.RUNNING.getValue());
|
|
update.setStatus(ExecutionStatusEnum.RUNNING.getValue());
|
|
|
update.setLastUpdateTime(LocalDateTime.now());
|
|
update.setLastUpdateTime(LocalDateTime.now());
|
|
@@ -424,11 +454,6 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
|
|
protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
|
|
|
- try {
|
|
|
|
|
- Thread.sleep(2000L);
|
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
- }
|
|
|
|
|
if (StringUtils.isBlank(edge.getTargetNodeKey())) {
|
|
if (StringUtils.isBlank(edge.getTargetNodeKey())) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -436,10 +461,35 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
CompanyWorkflowNode nextNode = getNodeByKey(edge.getTargetNodeKey());
|
|
CompanyWorkflowNode nextNode = getNodeByKey(edge.getTargetNodeKey());
|
|
|
nextContext.setCurrentNodeKey(nextNode.getNodeKey());
|
|
nextContext.setCurrentNodeKey(nextNode.getNodeKey());
|
|
|
nextContext.setVariable("nodeName", nextNode.getNodeName());
|
|
nextContext.setVariable("nodeName", nextNode.getNodeName());
|
|
|
- log.info("开始执行下一个节点:{}", nextNode.getNodeName());
|
|
|
|
|
- execPointNextNode(nextContext);
|
|
|
|
|
IWorkflowNode node = workflowNodeFactory.createNode(nextNode.getNodeKey(), NodeTypeEnum.fromCode(nextNode.getNodeType()), nextNode.getNodeName(), null);
|
|
IWorkflowNode node = workflowNodeFactory.createNode(nextNode.getNodeKey(), NodeTypeEnum.fromCode(nextNode.getNodeType()), nextNode.getNodeName(), null);
|
|
|
- node.execute(nextContext);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 异步执行下一节点:当前节点不等待,下一节点在独立线程中执行(含 2 秒延迟)
|
|
|
|
|
+ Executor executor = getWorkflowNextNodeExecutor();
|
|
|
|
|
+ Runnable nextTask = () -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Thread.sleep(2000L);
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.warn("runNextNode 异步任务被中断", e);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("开始执行下一个节点:{}", nextNode.getNodeName());
|
|
|
|
|
+ execPointNextNode(nextContext);
|
|
|
|
|
+ node.execute(nextContext);
|
|
|
|
|
+ };
|
|
|
|
|
+ if (executor != null) {
|
|
|
|
|
+ CompletableFuture.runAsync(nextTask, executor);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ CompletableFuture.runAsync(nextTask);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static Executor getWorkflowNextNodeExecutor() {
|
|
|
|
|
+ try {
|
|
|
|
|
+ return SpringUtils.getBean("cidWorkFlowExecutor");
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -449,7 +499,13 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
*/
|
|
*/
|
|
|
protected Boolean runnable(ExecutionContext context) {
|
|
protected Boolean runnable(ExecutionContext context) {
|
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+ if (exec == null) {
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
return !exec.getCurrentNodeKey().equals(exec.getEndNodeKey());
|
|
return !exec.getCurrentNodeKey().equals(exec.getEndNodeKey());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ public static String getContinueTimerExecuteKey(Integer groupNo, LocalTime time) {
|
|
|
|
|
+ return String.format(CONTINUE_TIMER_EXECUTE_KEY, groupNo,time.getHour(), time.getMinute());
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|