|
|
@@ -23,6 +23,7 @@ import com.fs.common.core.redis.RedisCacheT;
|
|
|
import com.fs.common.exception.CustomException;
|
|
|
import com.fs.common.exception.base.BaseException;
|
|
|
import com.fs.common.service.impl.SmsServiceImpl;
|
|
|
+import com.fs.company.service.impl.call.EasyCallTaskControlService;
|
|
|
import com.fs.common.utils.*;
|
|
|
import com.fs.common.utils.spring.SpringUtils;
|
|
|
import com.fs.company.domain.*;
|
|
|
@@ -140,6 +141,9 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
|
|
|
private final ICrmCustomerAnalyzeService crmCustomerAnalyzeService;
|
|
|
|
|
|
+ private final EasyCallTaskControlService easyCallTaskControlService;
|
|
|
+ private final CompanySiptaskInfoMapper companySiptaskInfoMapper;
|
|
|
+
|
|
|
/** EasyCall intent 意向度重试队列 Redis key 前缀,value 为已重试次数 */
|
|
|
private static final String EASYCALL_INTENT_RETRY_KEY = "easycall:intent:retry:";
|
|
|
/** intent 意向度等待重试最大次数(每次间隔约30秒,最多等待 5*30=150秒) */
|
|
|
@@ -1960,17 +1964,267 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
public R pauseRoboticActive(PauseRoboticActiveParam param) {
|
|
|
//暂停任务
|
|
|
if (ACTIVE_TYPE_PAUSE.equals(param.getActiveType())) {
|
|
|
+ CompanyVoiceRobotic robotic = selectCompanyVoiceRoboticById(param.getTaskId());
|
|
|
+ if (robotic == null || robotic.getTaskStatus() != 1) {
|
|
|
+ return R.error("任务不在执行中状态,无法暂停");
|
|
|
+ }
|
|
|
+ robotic.setTaskStatus(2);
|
|
|
+ updateCompanyVoiceRobotic(robotic);
|
|
|
+ // 更新Redis缓存
|
|
|
+ redisCache2.setCacheObject("task:status:" + param.getTaskId(), 2);
|
|
|
+ // 调用EasyCall暂停
|
|
|
+ pauseEasyCallTask(robotic);
|
|
|
+ return R.ok("暂停成功");
|
|
|
+ }
|
|
|
+ //恢复任务继续进入可运行
|
|
|
+ else if (ACTIVE_TYPE_CONTINUE.equals(param.getActiveType())) {
|
|
|
+ CompanyVoiceRobotic robotic = selectCompanyVoiceRoboticById(param.getTaskId());
|
|
|
+ if (robotic == null || robotic.getTaskStatus() != 2) {
|
|
|
+ return R.error("任务不在中断状态,无法继续");
|
|
|
+ }
|
|
|
+ robotic.setTaskStatus(1);
|
|
|
+ updateCompanyVoiceRobotic(robotic);
|
|
|
+ // 更新Redis缓存
|
|
|
+ redisCache2.setCacheObject("task:status:" + param.getTaskId(), 1);
|
|
|
+ // 调用EasyCall恢复
|
|
|
+ resumeEasyCallTask(robotic);
|
|
|
+ // 异步执行恢复扫描(分批处理,避免阻塞接口)
|
|
|
+ // 通过SpringUtils获取代理对象调用,确保@Async生效
|
|
|
+ SpringUtils.getBean(CompanyVoiceRoboticServiceImpl.class).resumePausedInstances(param.getTaskId());
|
|
|
+ return R.ok("继续成功");
|
|
|
+ }
|
|
|
|
|
|
- // 暂停任务更新
|
|
|
+ return R.error("操作类型无效");
|
|
|
+ }
|
|
|
|
|
|
- // 暂停任务创建的三方外呼任务
|
|
|
+ /**
|
|
|
+ * 暂停该任务关联的所有EasyCall外呼任务
|
|
|
+ */
|
|
|
+ private void pauseEasyCallTask(CompanyVoiceRobotic robotic) {
|
|
|
+ try {
|
|
|
+ CompanySiptaskInfo query = new CompanySiptaskInfo();
|
|
|
+ query.setTaskId(robotic.getId());
|
|
|
+ List<CompanySiptaskInfo> sipTasks = companySiptaskInfoMapper.selectCompanySiptaskInfoList(query);
|
|
|
+ if (sipTasks != null && !sipTasks.isEmpty()) {
|
|
|
+ for (CompanySiptaskInfo sipTask : sipTasks) {
|
|
|
+ if (sipTask.getBatchId() != null) {
|
|
|
+ easyCallTaskControlService.pauseTask(sipTask.getBatchId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("暂停EasyCall任务异常, roboticId={}", robotic.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * 恢复该任务关联的所有EasyCall外呼任务
|
|
|
+ */
|
|
|
+ private void resumeEasyCallTask(CompanyVoiceRobotic robotic) {
|
|
|
+ try {
|
|
|
+ CompanySiptaskInfo query = new CompanySiptaskInfo();
|
|
|
+ query.setTaskId(robotic.getId());
|
|
|
+ List<CompanySiptaskInfo> sipTasks = companySiptaskInfoMapper.selectCompanySiptaskInfoList(query);
|
|
|
+ if (sipTasks != null && !sipTasks.isEmpty()) {
|
|
|
+ for (CompanySiptaskInfo sipTask : sipTasks) {
|
|
|
+ if (sipTask.getBatchId() != null) {
|
|
|
+ easyCallTaskControlService.resumeTask(sipTask.getBatchId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复EasyCall任务异常, roboticId={}", robotic.getId(), e);
|
|
|
}
|
|
|
- //恢复任务继续进入可运行
|
|
|
- else if (ACTIVE_TYPE_CONTINUE.equals(param.getActiveType())) {
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步恢复暂停期间被阻塞的工作流实例
|
|
|
+ * 查询该任务下所有PAUSED(4)/WAITING(5)状态的工作流实例,分批处理
|
|
|
+ *
|
|
|
+ * @param taskId 任务ID
|
|
|
+ */
|
|
|
+ @Async("cidWorkFlowExecutor")
|
|
|
+ public void resumePausedInstances(Long taskId) {
|
|
|
+ try {
|
|
|
+ log.info("开始恢复暂停实例, taskId={}", taskId);
|
|
|
+
|
|
|
+ // 先查询该任务下所有业务记录的ID(businessKey存的是CompanyVoiceRoboticBusiness.id,而非taskId)
|
|
|
+ LambdaQueryWrapper<CompanyVoiceRoboticBusiness> bizWrapper = new LambdaQueryWrapper<>();
|
|
|
+ bizWrapper.eq(CompanyVoiceRoboticBusiness::getRoboticId, taskId)
|
|
|
+ .select(CompanyVoiceRoboticBusiness::getId);
|
|
|
+ List<CompanyVoiceRoboticBusiness> bizList = companyVoiceRoboticBusinessMapper.selectList(bizWrapper);
|
|
|
+ if (bizList == null || bizList.isEmpty()) {
|
|
|
+ log.info("未找到业务记录, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<Long> businessIds = bizList.stream().map(CompanyVoiceRoboticBusiness::getId).collect(java.util.stream.Collectors.toList());
|
|
|
+
|
|
|
+ // 查询该任务下所有PAUSED和WAITING状态的工作流实例
|
|
|
+ LambdaQueryWrapper<CompanyAiWorkflowExec> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.in(CompanyAiWorkflowExec::getBusinessKey, businessIds)
|
|
|
+ .in(CompanyAiWorkflowExec::getStatus,
|
|
|
+ ExecutionStatusEnum.PAUSED.getValue(),
|
|
|
+ ExecutionStatusEnum.WAITING.getValue(),
|
|
|
+ ExecutionStatusEnum.PENDING.getValue());
|
|
|
+ List<CompanyAiWorkflowExec> execList = companyAiWorkflowExecMapper.selectList(queryWrapper);
|
|
|
+
|
|
|
+ if (execList == null || execList.isEmpty()) {
|
|
|
+ log.info("无需恢复的暂停实例, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("找到待恢复实例 {} 个, taskId={}", execList.size(), taskId);
|
|
|
|
|
|
+ int batchSize = 50;
|
|
|
+ for (int i = 0; i < execList.size(); i++) {
|
|
|
+ CompanyAiWorkflowExec exec = execList.get(i);
|
|
|
+ try {
|
|
|
+ processResumeInstance(exec);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复实例异常, instanceId={}", exec.getWorkflowInstanceId(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 每处理50个暂停2秒,避免压力过大
|
|
|
+ if ((i + 1) % batchSize == 0 && i + 1 < execList.size()) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("恢复暂停实例被中断, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("恢复暂停实例完成, taskId={}, count={}", taskId, execList.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复暂停实例整体异常, taskId={}", taskId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理单个待恢复的工作流实例
|
|
|
+ */
|
|
|
+ private void processResumeInstance(CompanyAiWorkflowExec exec) {
|
|
|
+ Integer status = exec.getStatus();
|
|
|
+ Integer nodeType = exec.getCurrentNodeType();
|
|
|
+ String instanceId = exec.getWorkflowInstanceId();
|
|
|
+ String nodeKey = exec.getCurrentNodeKey();
|
|
|
+
|
|
|
+ // exec.getVariables() 存储的是 variables Map 的 JSON(不是完整 ExecutionContext)
|
|
|
+ Map<String, Object> inputData = new HashMap<>();
|
|
|
+ if (StringUtils.isNotEmpty(exec.getVariables())) {
|
|
|
+ try {
|
|
|
+ Map<String, Object> parsed = JSON.parseObject(exec.getVariables(), Map.class);
|
|
|
+ if (parsed != null) {
|
|
|
+ inputData = parsed;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("反序列化variables失败, instanceId={}", instanceId, e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return R.ok("操作成功");
|
|
|
+ // PAUSED状态处理
|
|
|
+ if (ExecutionStatusEnum.PAUSED.getValue() == status) {
|
|
|
+ // AI_CALL_TASK + PAUSED:检查是否有回调
|
|
|
+ if (NodeTypeEnum.AI_CALL_TASK.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复AI外呼PAUSED实例, instanceId={}", instanceId);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // AI_ADD_WX_TASK / AI_QW_ADD_WX_TASK + PAUSED:检查是否有回调
|
|
|
+ else if (NodeTypeEnum.AI_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复加微PAUSED实例, instanceId={}, nodeType={}", instanceId, nodeType);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // WAITING状态处理
|
|
|
+ else if (ExecutionStatusEnum.WAITING.getValue() == status) {
|
|
|
+ // 优先从 variables 中获取延时目标节点key(暂停时同步存入的)
|
|
|
+ String targetNodeKey = inputData.containsKey("_delayTargetNodeKey")
|
|
|
+ ? (String) inputData.get("_delayTargetNodeKey") : nodeKey;
|
|
|
+ // AI_CALL_TASK + WAITING:延时已过期,触发timeDoExecute
|
|
|
+ if (NodeTypeEnum.AI_CALL_TASK.getValue().equals(nodeType)) {
|
|
|
+ log.info("恢复AI外呼WAITING实例, instanceId={}, targetNodeKey={}", instanceId, targetNodeKey);
|
|
|
+ companyWorkflowEngine.timeDoExecute(instanceId, targetNodeKey, inputData);
|
|
|
+ }
|
|
|
+ // AI_ADD_WX_TASK / AI_QW_ADD_WX_TASK / AI_ADD_WX_TASK_NEW + WAITING
|
|
|
+ else if (NodeTypeEnum.AI_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复加微WAITING实例, instanceId={}, nodeType={}", instanceId, nodeType);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ } else {
|
|
|
+ // 无回调但有延时目标节点(加微延时场景)
|
|
|
+ if (inputData.containsKey("_delayTargetNodeKey")) {
|
|
|
+ log.info("恢复加微WAITING延时实例, instanceId={}, targetNodeKey={}", instanceId, targetNodeKey);
|
|
|
+ companyWorkflowEngine.timeDoExecute(instanceId, targetNodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // PENDING状态处理:重建Redis CONTINUE:TIMER:EXECUTE key
|
|
|
+ else if (ExecutionStatusEnum.PENDING.getValue() == status) {
|
|
|
+ if (!inputData.isEmpty()) {
|
|
|
+ ExecutionContext resumeContext = new ExecutionContext();
|
|
|
+ resumeContext.setWorkflowInstanceId(instanceId);
|
|
|
+ resumeContext.setCurrentNodeKey(nodeKey);
|
|
|
+ resumeContext.setVariables(inputData);
|
|
|
+ resumeContext.setBusinessId(exec.getBusinessKey());
|
|
|
+ log.info("重建PENDING实例Redis key, instanceId={}", instanceId);
|
|
|
+ rebuildContinueTimerKey(exec, resumeContext);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查variables中是否有回调标记
|
|
|
+ */
|
|
|
+ private boolean hasCallbackReceived(Map<String, Object> variables) {
|
|
|
+ if (variables == null) return false;
|
|
|
+ Object flag = variables.get("pause_callback_received");
|
|
|
+ return Boolean.TRUE.equals(flag) || "true".equals(String.valueOf(flag));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重建CONTINUE:TIMER:EXECUTE Redis key
|
|
|
+ */
|
|
|
+ private void rebuildContinueTimerKey(CompanyAiWorkflowExec exec, ExecutionContext context) {
|
|
|
+ try {
|
|
|
+ Integer groupNo = exec.getCidGroupNo();
|
|
|
+ Date now = new Date();
|
|
|
+ int hour = now.getHours();
|
|
|
+ int minute = now.getMinutes();
|
|
|
+ String redisKey = "CONTINUE:TIMER:EXECUTE:" + groupNo + ":" + hour + ":" + minute;
|
|
|
+ String contextJson = JSON.toJSONString(context);
|
|
|
+ redisCache2.setCacheObject(redisKey + ":" + exec.getWorkflowInstanceId(), contextJson);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("重建CONTINUE:TIMER:EXECUTE Redis key异常, instanceId={}", exec.getWorkflowInstanceId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isTaskPaused(Long taskId) {
|
|
|
+ if (taskId == null) return false;
|
|
|
+ // 优先从Redis读取
|
|
|
+ Integer status = redisCache2.getCacheObject("task:status:" + taskId);
|
|
|
+ if (status != null) {
|
|
|
+ return status == 2;
|
|
|
+ }
|
|
|
+ // Redis无数据则查DB
|
|
|
+ CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectCompanyVoiceRoboticById(taskId);
|
|
|
+ if (robotic != null) {
|
|
|
+ // 回填Redis
|
|
|
+ redisCache2.setCacheObject("task:status:" + taskId, robotic.getTaskStatus());
|
|
|
+ return robotic.getTaskStatus() != null && robotic.getTaskStatus() == 2;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|