|
@@ -1,44 +1,170 @@
|
|
|
package com.fs.company.service.impl.call.node;
|
|
package com.fs.company.service.impl.call.node;
|
|
|
|
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.fs.common.constant.Constants;
|
|
|
|
|
+import com.fs.common.core.redis.RedisCacheT;
|
|
|
import com.fs.common.utils.spring.SpringUtils;
|
|
import com.fs.common.utils.spring.SpringUtils;
|
|
|
-import com.fs.company.domain.CompanyAiWorkflowExec;
|
|
|
|
|
-import com.fs.company.domain.CompanyWorkflowEdge;
|
|
|
|
|
-import com.fs.company.domain.CompanyWorkflowNode;
|
|
|
|
|
-import com.fs.company.mapper.CompanyAiWorkflowExecMapper;
|
|
|
|
|
-import com.fs.company.mapper.CompanyWorkflowEdgeMapper;
|
|
|
|
|
-import com.fs.company.mapper.CompanyWorkflowNodeMapper;
|
|
|
|
|
|
|
+import com.fs.company.domain.*;
|
|
|
|
|
+import com.fs.company.mapper.CompanyWxClientMapper;
|
|
|
import com.fs.company.param.ExecutionContext;
|
|
import com.fs.company.param.ExecutionContext;
|
|
|
|
|
+import com.fs.company.vo.AiAddWxWorkflowConditionVo;
|
|
|
import com.fs.company.vo.ExecutionResult;
|
|
import com.fs.company.vo.ExecutionResult;
|
|
|
import com.fs.enums.NodeTypeEnum;
|
|
import com.fs.enums.NodeTypeEnum;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author MixLiu
|
|
* @author MixLiu
|
|
|
* @date 2026/1/28 13:39
|
|
* @date 2026/1/28 13:39
|
|
|
- * @description AI外呼电话任务节点
|
|
|
|
|
|
|
+ * @description AI添加微信任务节点
|
|
|
*/
|
|
*/
|
|
|
|
|
+@Slf4j
|
|
|
public class AiAddWxTaskNode extends AbstractWorkflowNode {
|
|
public class AiAddWxTaskNode extends AbstractWorkflowNode {
|
|
|
|
|
|
|
|
- private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
|
|
|
|
|
- private static final CompanyAiWorkflowExecMapper workflowExecMapper = SpringUtils.getBean(CompanyAiWorkflowExecMapper.class);
|
|
|
|
|
- private static final CompanyWorkflowEdgeMapper companyWorkflowEdgeMapper = SpringUtils.getBean(CompanyWorkflowEdgeMapper.class);
|
|
|
|
|
|
|
+ private static final CompanyWxClientMapper companyWxClientMapper = SpringUtils.getBean(CompanyWxClientMapper.class);
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
|
+ private static final RedisCacheT<String> redisCache = SpringUtils.getBean(RedisCacheT.class);
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 默认加微超时时间(分钟)
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final int DEFAULT_ADD_WX_TIMEOUT_MINUTES = 30;
|
|
|
|
|
|
|
|
public AiAddWxTaskNode(String nodeKey, String nodeName, Map<String, Object> properties) {
|
|
public AiAddWxTaskNode(String nodeKey, String nodeName, Map<String, Object> properties) {
|
|
|
super(nodeKey, nodeName, properties);
|
|
super(nodeKey, nodeName, properties);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 收到加微回调后,继续判定和执行下一步动作
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param context 执行上下文
|
|
|
|
|
+ * @return 执行结果
|
|
|
|
|
+ */
|
|
|
@Override
|
|
@Override
|
|
|
protected ExecutionResult doContinue(ExecutionContext context) {
|
|
protected ExecutionResult doContinue(ExecutionContext context) {
|
|
|
- return null;
|
|
|
|
|
|
|
+ CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
|
|
|
|
|
+
|
|
|
|
|
+ List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
|
|
|
|
|
+
|
|
|
|
|
+ // 获取业务数据
|
|
|
|
|
+ CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
|
|
|
|
|
+ // 获取加微记录
|
|
|
|
|
+ CompanyWxClient wxClient = companyWxClientMapper.selectById(business.getWxClientId());
|
|
|
|
|
+
|
|
|
|
|
+ // 根据加微结果判断走哪条边
|
|
|
|
|
+ String nextNodeKey = null;
|
|
|
|
|
+ for (CompanyWorkflowEdge edge : edges) {
|
|
|
|
|
+ if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
|
|
|
|
|
+ // 无条件边,直接使用
|
|
|
|
|
+ nextNodeKey = edge.getTargetNodeKey();
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ AiAddWxWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiAddWxWorkflowConditionVo.class);
|
|
|
|
|
+ // 判断加微是否成功 (isAdd: 0否 1是 2待添加 3作废)
|
|
|
|
|
+ boolean addSuccess = wxClient != null && Integer.valueOf(1).equals(wxClient.getIsAdd());
|
|
|
|
|
+ if (condition.isAddSuccess() == addSuccess) {
|
|
|
|
|
+ nextNodeKey = edge.getTargetNodeKey();
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (nextNodeKey != null) {
|
|
|
|
|
+ return ExecutionResult.success().nextNodeKey(nextNodeKey).build();
|
|
|
|
|
+ }
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("未找到满足条件的下一节点").build();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 执行加微节点逻辑(只准备数据,实际加微由定时任务执行)
|
|
|
|
|
+ * 1. 通过 workflowInstanceId 找到 CompanyVoiceRoboticBusiness 业务数据
|
|
|
|
|
+ * 2. 验证 CompanyWxClient 数据是否已准备好(accountId、dialogId、isAdd=0)
|
|
|
|
|
+ * 3. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
|
|
|
|
|
+ * 4. 设置超时时间到 Redis
|
|
|
|
|
+ * 5. 返回等待状态,等待定时任务执行加微后的回调
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param context 执行上下文
|
|
|
|
|
+ * @return 执行结果
|
|
|
|
|
+ */
|
|
|
@Override
|
|
@Override
|
|
|
protected ExecutionResult doExecute(ExecutionContext context) {
|
|
protected ExecutionResult doExecute(ExecutionContext context) {
|
|
|
- CompanyWorkflowNode node = companyWorkflowNodeMapper.selectById(context.getCurrentNodeId());
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 获取业务数据
|
|
|
|
|
+ CompanyVoiceRoboticBusiness business = super.getRoboticBusiness(context.getWorkflowInstanceId());
|
|
|
|
|
+ if (business == null) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("未找到业务数据").build();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- return null;
|
|
|
|
|
|
|
+ // 2. 通过 wxClientId 获取加微客户记录
|
|
|
|
|
+ Long wxClientId = business.getWxClientId();
|
|
|
|
|
+ if (wxClientId == null) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("业务数据中缺少wxClientId").build();
|
|
|
|
|
+ }
|
|
|
|
|
+ CompanyWxClient wxClient = companyWxClientMapper.selectById(wxClientId);
|
|
|
|
|
+ if (wxClient == null) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("未找到加微客户记录: " + wxClientId).build();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 验证加微数据是否已准备好
|
|
|
|
|
+ Long accountId = wxClient.getAccountId();
|
|
|
|
|
+ if (accountId == null) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("加微客户记录中缺少accountId,请先分配微信账号").build();
|
|
|
|
|
+ }
|
|
|
|
|
+ Long dialogId = wxClient.getDialogId();
|
|
|
|
|
+ if (dialogId == null) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("加微客户记录中缺少dialogId,请先设置话术").build();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 确保 isAdd = 0(未添加状态),这样定时任务才会处理
|
|
|
|
|
+ if (!Integer.valueOf(0).equals(wxClient.getIsAdd())) {
|
|
|
|
|
+ log.warn("加微客户记录状态不是未添加(0),当前状态: {} - wxClientId: {}", wxClient.getIsAdd(), wxClientId);
|
|
|
|
|
+ // 如果已经是待添加(2)或已添加(1),说明已经在处理中或已完成
|
|
|
|
|
+ if (Integer.valueOf(1).equals(wxClient.getIsAdd())) {
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("该客户已加微成功,无需重复添加").build();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (Integer.valueOf(2).equals(wxClient.getIsAdd())) {
|
|
|
|
|
+ // 已经是待添加状态,继续等待
|
|
|
|
|
+ log.info("加微客户记录已是待添加状态,继续等待 - wxClientId: {}", wxClientId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Long roboticId = business.getRoboticId();
|
|
|
|
|
+ log.info("准备加微任务数据 - workflowInstanceId: {}, roboticId: {}, wxClientId: {}, accountId: {}",
|
|
|
|
|
+ context.getWorkflowInstanceId(), roboticId, wxClientId, accountId);
|
|
|
|
|
+
|
|
|
|
|
+ // 5. 设置 Redis 任务状态为 ADD_WX,让定时任务执行加微
|
|
|
|
|
+ // 定时任务 WxTaskService.addWx() 会检查这个状态
|
|
|
|
|
+ String taskKey = Constants.TASK_ID + roboticId;
|
|
|
|
|
+ redisCache.setCacheObject(taskKey, Constants.ADD_WX);
|
|
|
|
|
+ log.info("设置任务状态为加微 - key: {}, value: {}", taskKey, Constants.ADD_WX);
|
|
|
|
|
+
|
|
|
|
|
+ // 6. 设置加微超时检测时间到 Redis
|
|
|
|
|
+ int timeoutMinutes = getTimeoutFromProperties();
|
|
|
|
|
+ long timeoutTimestamp = System.currentTimeMillis() + timeoutMinutes * 60 * 1000L;
|
|
|
|
|
+ String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + context.getWorkflowInstanceId() + ":" + wxClientId;
|
|
|
|
|
+ redisCache.setCacheObject(timeoutKey, String.valueOf(timeoutTimestamp));
|
|
|
|
|
+ log.info("设置加微超时检测 - key: {}, timeout: {}分钟, 超时时间戳: {}",
|
|
|
|
|
+ timeoutKey, timeoutMinutes, timeoutTimestamp);
|
|
|
|
|
+
|
|
|
|
|
+ // 7. 保存执行结果到输出数据
|
|
|
|
|
+ Map<String, Object> outputData = new HashMap<>();
|
|
|
|
|
+ outputData.put("wxClientId", wxClientId);
|
|
|
|
|
+ outputData.put("accountId", accountId);
|
|
|
|
|
+ outputData.put("roboticId", roboticId);
|
|
|
|
|
+ outputData.put("preparedForAddWx", true);
|
|
|
|
|
+
|
|
|
|
|
+ // 8. 异步节点返回等待状态,等待定时任务执行加微后的回调
|
|
|
|
|
+ return ExecutionResult.paused()
|
|
|
|
|
+ .outputData(outputData)
|
|
|
|
|
+ .nextNodeKey(getNextNodeKey(context.getWorkflowInstanceId(), nodeKey))
|
|
|
|
|
+ .build();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("准备加微任务数据异常 - workflowInstanceId: {}", context.getWorkflowInstanceId(), e);
|
|
|
|
|
+ return ExecutionResult.failure().errorMessage("准备加微任务数据异常: " + e.getMessage()).build();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -53,20 +179,94 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
|
|
public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
|
|
|
-
|
|
|
|
|
- CompanyAiWorkflowExec companyAiWorkflowExec = workflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
|
|
|
|
|
|
|
+ CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
|
|
|
|
|
|
|
|
List<CompanyWorkflowEdge> companyWorkflowEdges =
|
|
List<CompanyWorkflowEdge> companyWorkflowEdges =
|
|
|
companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(companyAiWorkflowExec.getWorkflowId(), nodeKey);
|
|
companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(companyAiWorkflowExec.getWorkflowId(), nodeKey);
|
|
|
- CompanyWorkflowEdge result = null;
|
|
|
|
|
- for (CompanyWorkflowEdge companyWorkflowEdge : companyWorkflowEdges) {
|
|
|
|
|
|
|
|
|
|
|
|
+ // 多条边时需要根据条件判断
|
|
|
|
|
+ if (companyWorkflowEdges != null && !companyWorkflowEdges.isEmpty()) {
|
|
|
|
|
+ if (companyWorkflowEdges.size() > 1) {
|
|
|
|
|
+ // 存在多条件时,找默认边或第一条边
|
|
|
|
|
+ for (CompanyWorkflowEdge edge : companyWorkflowEdges) {
|
|
|
|
|
+ if (edge.getConditionExpr() == null || edge.getConditionExpr().isEmpty()) {
|
|
|
|
|
+ return edge.getTargetNodeKey();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 如果没有无条件边,返回第一条
|
|
|
|
|
+ return companyWorkflowEdges.get(0).getTargetNodeKey();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return companyWorkflowEdges.get(0).getTargetNodeKey();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- return result == null ? null : result.getTargetNodeKey();
|
|
|
|
|
|
|
+ return null;
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
@Override
|
|
@Override
|
|
|
- public Boolean edgeConditionValidate(String condition){
|
|
|
|
|
|
|
+ public Boolean edgeConditionValidate(String condition) {
|
|
|
|
|
+ if (condition == null || condition.isEmpty()) {
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ AiAddWxWorkflowConditionVo conditionVo = JSON.parseObject(condition, AiAddWxWorkflowConditionVo.class);
|
|
|
|
|
+ return conditionVo != null;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解析加微条件失败: {}", condition, e);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从节点配置获取超时时间(分钟)
|
|
|
|
|
+ */
|
|
|
|
|
+ private int getTimeoutFromProperties() {
|
|
|
|
|
+ if (properties != null && properties.containsKey("timeout")) {
|
|
|
|
|
+ Object timeout = properties.get("timeout");
|
|
|
|
|
+ if (timeout instanceof Number) {
|
|
|
|
|
+ return ((Number) timeout).intValue();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (timeout instanceof String) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ return Integer.parseInt((String) timeout);
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ log.warn("解析超时时间失败: {}, 使用默认值: {}", timeout, DEFAULT_ADD_WX_TIMEOUT_MINUTES);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return DEFAULT_ADD_WX_TIMEOUT_MINUTES;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 检查并标记已执行(用于互斥逻辑)
|
|
|
|
|
+ * 如果返回 true 表示当前是第一个执行的,可以继续
|
|
|
|
|
+ * 如果返回 false 表示已经被其他路径执行过了,不再执行
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param workflowInstanceId 工作流实例ID
|
|
|
|
|
+ * @param wxClientId 加微客户ID
|
|
|
|
|
+ * @return 是否可以执行
|
|
|
|
|
+ */
|
|
|
|
|
+ public static boolean tryMarkAsExecuted(String workflowInstanceId, Long wxClientId) {
|
|
|
|
|
+ String executedKey = Constants.WORKFLOW_ADD_WX_EXECUTED + workflowInstanceId + ":" + wxClientId;
|
|
|
|
|
+ String existingValue = redisCache.getCacheObject(executedKey);
|
|
|
|
|
+ if (existingValue != null) {
|
|
|
|
|
+ // 已经被执行过了
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 标记为已执行,设置1小时过期
|
|
|
|
|
+ redisCache.setCacheObject(executedKey, "1");
|
|
|
|
|
+ redisCache.expire(executedKey, 3600);
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- return false;
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 清除超时检测 Key
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param workflowInstanceId 工作流实例ID
|
|
|
|
|
+ * @param wxClientId 加微客户ID
|
|
|
|
|
+ */
|
|
|
|
|
+ public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
|
|
|
|
|
+ String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
|
|
|
|
|
+ redisCache.deleteObject(timeoutKey);
|
|
|
|
|
+ log.info("清除加微超时检测 Key: {}", timeoutKey);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|