|
@@ -2,6 +2,7 @@ package com.fs.company.service.impl.call.node;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
+import com.fs.common.core.domain.R;
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.exception.CustomException;
|
|
import com.fs.common.exception.CustomException;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
import com.fs.common.utils.StringUtils;
|
|
@@ -14,9 +15,12 @@ import com.fs.company.vo.ExecutionResult;
|
|
|
import com.fs.enums.ExecutionStatusEnum;
|
|
import com.fs.enums.ExecutionStatusEnum;
|
|
|
import com.fs.enums.NodeTypeEnum;
|
|
import com.fs.enums.NodeTypeEnum;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.redisson.api.RLock;
|
|
|
|
|
+import org.redisson.api.RedissonClient;
|
|
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author MixLiu
|
|
* @author MixLiu
|
|
@@ -36,6 +40,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
public static final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
public static final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
public static final WorkflowNodeFactory workflowNodeFactory = SpringUtils.getBean(WorkflowNodeFactory.class);
|
|
public static final WorkflowNodeFactory workflowNodeFactory = SpringUtils.getBean(WorkflowNodeFactory.class);
|
|
|
public static final ObjectMapper objectMapper = new ObjectMapper();
|
|
public static final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
|
+ public static final RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
|
|
|
|
|
+ protected static final String NODE_EXEC_LOCK_PREFIX = "node_exec_lock_";
|
|
|
|
|
|
|
|
protected String nodeKey;
|
|
protected String nodeKey;
|
|
|
protected String nodeName;
|
|
protected String nodeName;
|
|
@@ -53,13 +59,20 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
// 记录执行开始时间
|
|
// 记录执行开始时间
|
|
|
long startTime = System.currentTimeMillis();
|
|
long startTime = System.currentTimeMillis();
|
|
|
ExecutionResult result = null;
|
|
ExecutionResult result = null;
|
|
|
|
|
+ RLock lock = null;
|
|
|
try {
|
|
try {
|
|
|
|
|
+ String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
|
|
|
|
|
+ lock = redissonClient.getLock(lockKey);
|
|
|
|
|
+ boolean isLocked;
|
|
|
|
|
+ isLocked = lock.tryLock(3, 30, TimeUnit.SECONDS);
|
|
|
|
|
+ if (!isLocked) {
|
|
|
|
|
+ log.info("实例节点正在执行,上锁失败 - workflowInstanceId: {}", context.getWorkflowInstanceId());
|
|
|
|
|
+ return handleExecutionError(new CustomException("实例节点正在执行"), context);
|
|
|
|
|
+ }
|
|
|
// 执行前的通用处理
|
|
// 执行前的通用处理
|
|
|
preExecute(context);
|
|
preExecute(context);
|
|
|
-
|
|
|
|
|
// 执行具体的业务逻辑
|
|
// 执行具体的业务逻辑
|
|
|
result = doExecute(context);
|
|
result = doExecute(context);
|
|
|
-
|
|
|
|
|
// 记录执行时间
|
|
// 记录执行时间
|
|
|
long executionTime = System.currentTimeMillis() - startTime;
|
|
long executionTime = System.currentTimeMillis() - startTime;
|
|
|
context.setVariable("execution_time_" + nodeKey, executionTime);
|
|
context.setVariable("execution_time_" + nodeKey, executionTime);
|
|
@@ -69,13 +82,27 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
return handleExecutionError(e, context);
|
|
return handleExecutionError(e, context);
|
|
|
} finally {
|
|
} finally {
|
|
|
postExecute(context, result);
|
|
postExecute(context, result);
|
|
|
|
|
+ if (lock != null && lock.isHeldByCurrentThread()) {
|
|
|
|
|
+ lock.unlock();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public ExecutionResult continueExecute(ExecutionContext context) {
|
|
public ExecutionResult continueExecute(ExecutionContext context) {
|
|
|
|
|
+ RLock lock = null;
|
|
|
try {
|
|
try {
|
|
|
|
|
+ String lockKey = NODE_EXEC_LOCK_PREFIX + context.getWorkflowInstanceId();
|
|
|
|
|
+ lock = redissonClient.getLock(lockKey);
|
|
|
|
|
+ boolean isLocked;
|
|
|
|
|
+ isLocked = lock.tryLock(3, 30, TimeUnit.SECONDS);
|
|
|
|
|
+ if (!isLocked) {
|
|
|
|
|
+ return handleExecutionError(new CustomException("实例节点正在执行"), context);
|
|
|
|
|
+ }
|
|
|
CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+ if (!context.getCurrentNodeKey().equals(companyAiWorkflowExec.getCurrentNodeKey())) {
|
|
|
|
|
+ return handleExecutionError(new CustomException("节点不符,当前节点: " + companyAiWorkflowExec.getCurrentNodeKey()), context);
|
|
|
|
|
+ }
|
|
|
log.info("收到继续执行请求 - workflowInstanceId: {}, nodeKey: {}, 当前状态: {}",
|
|
log.info("收到继续执行请求 - workflowInstanceId: {}, nodeKey: {}, 当前状态: {}",
|
|
|
context.getWorkflowInstanceId(), nodeKey, companyAiWorkflowExec.getStatus());
|
|
context.getWorkflowInstanceId(), nodeKey, companyAiWorkflowExec.getStatus());
|
|
|
CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
|
|
CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
|
|
@@ -92,19 +119,22 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
return handleExecutionError(e, context);
|
|
return handleExecutionError(e, context);
|
|
|
} finally {
|
|
} finally {
|
|
|
- //更新流程日志信息
|
|
|
|
|
- CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
|
|
|
|
|
- companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
- companyAiWorkflowExecLog.setNodeKey(nodeKey);
|
|
|
|
|
- companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
|
|
|
|
|
- List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
|
|
|
|
|
- if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
|
|
|
|
|
- CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
|
|
|
|
|
- fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
|
|
|
|
|
- fExecLog.setEndTime(new Date());
|
|
|
|
|
- long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
|
|
|
|
|
- fExecLog.setDuration(durationInMillis);
|
|
|
|
|
- companyAiWorkflowExecLogMapper.updateById(fExecLog);
|
|
|
|
|
|
|
+ if (lock != null && lock.isHeldByCurrentThread()) {
|
|
|
|
|
+ //更新流程日志信息
|
|
|
|
|
+ CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
|
|
|
|
|
+ companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+ companyAiWorkflowExecLog.setNodeKey(nodeKey);
|
|
|
|
|
+ companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
|
|
|
|
|
+ List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
|
|
|
|
|
+ if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
|
|
|
|
|
+ CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
|
|
|
|
|
+ fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
|
|
|
|
|
+ fExecLog.setEndTime(new Date());
|
|
|
|
|
+ long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
|
|
|
|
|
+ fExecLog.setDuration(durationInMillis);
|
|
|
|
|
+ companyAiWorkflowExecLogMapper.updateById(fExecLog);
|
|
|
|
|
+ }
|
|
|
|
|
+ lock.unlock();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|