|
|
@@ -30,6 +30,7 @@ import com.fs.company.mapper.*;
|
|
|
import com.fs.company.param.ExecutionContext;
|
|
|
import com.fs.company.param.PauseRoboticActiveParam;
|
|
|
import com.fs.company.service.*;
|
|
|
+import com.fs.company.service.impl.call.EasyCallTaskControlService;
|
|
|
import com.fs.company.vo.*;
|
|
|
import com.fs.company.vo.easycall.EasyCallCallPhoneVO;
|
|
|
import com.fs.core.config.TenantConfigContext;
|
|
|
@@ -41,6 +42,7 @@ import com.fs.crm.service.impl.CrmCustomerServiceImpl;
|
|
|
import com.fs.enums.ExecutionStatusEnum;
|
|
|
import com.fs.enums.NodeTypeEnum;
|
|
|
import com.fs.enums.TaskTypeEnum;
|
|
|
+import com.fs.his.utils.PhoneUtil;
|
|
|
import com.fs.qw.domain.QwUser;
|
|
|
import com.fs.qw.mapper.QwUserMapper;
|
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
|
@@ -140,6 +142,12 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
|
|
|
private final ICrmCustomerAnalyzeService crmCustomerAnalyzeService;
|
|
|
|
|
|
+ private final CompanyWorkflowNodeMapper companyWorkflowNodeMapper;
|
|
|
+
|
|
|
+ private final CompanySiptaskInfoMapper companySiptaskInfoMapper;
|
|
|
+
|
|
|
+ private final EasyCallTaskControlService easyCallTaskControlService;
|
|
|
+
|
|
|
/** EasyCall intent 意向度重试队列 Redis key 前缀,value 为已重试次数 */
|
|
|
private static final String EASYCALL_INTENT_RETRY_KEY = "easycall:intent:retry:";
|
|
|
/** intent 意向度等待重试最大次数(每次间隔约30秒,最多等待 5*30=150秒) */
|
|
|
@@ -1068,7 +1076,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
cacheString = cacheObj == null ? null : JSONObject.toJSONString(cacheObj);
|
|
|
}
|
|
|
if (StringUtils.isBlank(cacheString)) {
|
|
|
- log.error("easyCall外呼回调缓存信息缺失, uuid={}", callPhoneRes.getUuid());
|
|
|
+ log.error("easyCall外呼回调缓存信息缺失, uuid={},callBackUuid:{}", callPhoneRes.getUuid(),bizJson.getString("callBackUuid"));
|
|
|
return;
|
|
|
}
|
|
|
JSONObject cacheInfo = JSONObject.parseObject(cacheString);
|
|
|
@@ -1408,6 +1416,10 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
final Long workflowId = robotic.getCompanyAiWorkflowId();
|
|
|
final Long roboticId = robotic.getId();
|
|
|
|
|
|
+
|
|
|
+ companyWorkflowEngine.createSipTask(roboticId, workflowId);
|
|
|
+
|
|
|
+
|
|
|
// 先初始化所有工作流实例
|
|
|
List<ExecutionResult> initResults = new ArrayList<>();
|
|
|
for (CompanyVoiceRoboticBusiness business : roboticBusinesseList) {
|
|
|
@@ -1739,11 +1751,16 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
Integer pageSize,
|
|
|
String customerName,
|
|
|
String customerPhone,
|
|
|
- Boolean onlyCallNode) {
|
|
|
+ Boolean onlyCallNode,
|
|
|
+ String encryptPhone) {
|
|
|
//分页查询主数据
|
|
|
PageHelper.startPage(pageNum, pageSize);
|
|
|
+ if(StringUtils.isNotBlank(encryptPhone)){
|
|
|
+ encryptPhone = PhoneUtil.encryptPhone(encryptPhone);
|
|
|
+ }
|
|
|
+ List<WorkflowExecRecordVo> records = companyAiWorkflowExecMapper.selectExecRecordsByRoboticId(roboticId, customerName, customerPhone, onlyCallNode,encryptPhone);
|
|
|
|
|
|
- List<WorkflowExecRecordVo> records = companyAiWorkflowExecMapper.selectExecRecordsByRoboticId(roboticId, customerName, customerPhone, onlyCallNode);
|
|
|
+// List<WorkflowExecRecordVo> records = companyAiWorkflowExecMapper.selectExecRecordsByRoboticId(roboticId, customerName, customerPhone, onlyCallNode);
|
|
|
|
|
|
PageInfo<WorkflowExecRecordVo> pageInfo = new PageInfo<>(records);
|
|
|
|
|
|
@@ -1884,7 +1901,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
return new ArrayList<>();
|
|
|
}
|
|
|
List<CompanyAiWorkflowExecLog> callLogs = logs.stream().filter(a -> "外呼".equals(a.getNodeName())).collect(Collectors.toList());
|
|
|
- HashMap<Long,String> callContentMap;
|
|
|
+ HashMap<Long,CallContentVO> callContentMap;
|
|
|
if (null != callLogs && !callLogs.isEmpty()) {
|
|
|
callContentMap = selectCallContentByCallLogs(callLogs);
|
|
|
} else {
|
|
|
@@ -1904,7 +1921,11 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
vo.setDuration(log.getDuration());
|
|
|
vo.setErrorMessage(log.getErrorMessage());
|
|
|
vo.setOutputData(log.getOutputData());
|
|
|
- vo.setNodeContentList(callContentMap.get(log.getId()));
|
|
|
+ CallContentVO callContentVO = callContentMap.get(log.getId());
|
|
|
+ if (callContentVO != null) {
|
|
|
+ vo.setNodeContentList(callContentVO.getCallContent());
|
|
|
+ vo.setNodeRecordPath(callContentVO.getRecordPath());
|
|
|
+ }
|
|
|
return vo;
|
|
|
}).collect(Collectors.toList());
|
|
|
}
|
|
|
@@ -1914,15 +1935,15 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
* @param callLogs
|
|
|
* @return
|
|
|
*/
|
|
|
- public HashMap<Long,String> selectCallContentByCallLogs(List<CompanyAiWorkflowExecLog> callLogs){
|
|
|
+ public HashMap<Long,CallContentVO> selectCallContentByCallLogs(List<CompanyAiWorkflowExecLog> callLogs){
|
|
|
List<Long> ids = callLogs.stream().map(a -> a.getId()).collect(Collectors.toList());
|
|
|
if(null == ids || ids.isEmpty()){
|
|
|
return new HashMap<>();
|
|
|
}
|
|
|
List<CallContentVO> callContentVOS = companyAiWorkflowExecLogMapper.selectCallContent(ids);
|
|
|
if(null != callContentVOS && !callContentVOS.isEmpty()){
|
|
|
- HashMap<Long,String> map = new HashMap<>();
|
|
|
- callContentVOS.forEach(a -> map.put(a.getLogId(),a.getCallContent()));
|
|
|
+ HashMap<Long,CallContentVO> map = new HashMap<>();
|
|
|
+ callContentVOS.forEach(a -> map.put(a.getLogId(), a));
|
|
|
return map;
|
|
|
}
|
|
|
else{
|
|
|
@@ -1945,18 +1966,32 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
public R pauseRoboticActive(PauseRoboticActiveParam param) {
|
|
|
//暂停任务
|
|
|
if (ACTIVE_TYPE_PAUSE.equals(param.getActiveType())) {
|
|
|
-
|
|
|
- // 暂停任务更新
|
|
|
-
|
|
|
- // 暂停任务创建的三方外呼任务
|
|
|
-
|
|
|
+ CompanyVoiceRobotic robotic = selectCompanyVoiceRoboticById(param.getTaskId());
|
|
|
+ if (robotic == null || robotic.getTaskStatus() != 1) {
|
|
|
+ return R.error("任务不在执行中状态,无法暂停");
|
|
|
+ }
|
|
|
+ robotic.setTaskStatus(2);
|
|
|
+ robotic.setPauseSource("manual");
|
|
|
+ updateCompanyVoiceRobotic(robotic);
|
|
|
+ redisCache2.setCacheObject("task:status:" + param.getTaskId(), 2);
|
|
|
+ pauseEasyCallTask(robotic);
|
|
|
+ return R.ok("暂停成功");
|
|
|
}
|
|
|
- //恢复任务继续进入可运行
|
|
|
else if (ACTIVE_TYPE_CONTINUE.equals(param.getActiveType())) {
|
|
|
-
|
|
|
+ CompanyVoiceRobotic robotic = selectCompanyVoiceRoboticById(param.getTaskId());
|
|
|
+ if (robotic == null || robotic.getTaskStatus() != 2) {
|
|
|
+ return R.error("任务不在中断状态,无法继续");
|
|
|
+ }
|
|
|
+ robotic.setTaskStatus(1);
|
|
|
+ robotic.setPauseSource(null);
|
|
|
+ updateCompanyVoiceRobotic(robotic);
|
|
|
+ redisCache2.setCacheObject("task:status:" + param.getTaskId(), 1);
|
|
|
+ resumeEasyCallTask(robotic);
|
|
|
+ resumePausedInstances(param.getTaskId());
|
|
|
+ return R.ok("继续成功");
|
|
|
}
|
|
|
|
|
|
- return R.ok("操作成功");
|
|
|
+ return R.error("操作类型无效");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -1984,4 +2019,401 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public R appendCustomersToRunningTask(Long taskId, List<Long> customerIds) {
|
|
|
+ // 1. 校验参数
|
|
|
+ if (taskId == null || customerIds == null || customerIds.isEmpty()) {
|
|
|
+ return R.error("参数不能为空");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 校验任务
|
|
|
+ CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectCompanyVoiceRoboticById(taskId);
|
|
|
+ if (robotic == null) {
|
|
|
+ return R.error("任务不存在: " + taskId);
|
|
|
+ }
|
|
|
+ if (!robotic.getTaskType().equals(TaskTypeEnum.ORDINARY.getValue())) {
|
|
|
+ return R.error("仅普通任务支持追加客户");
|
|
|
+ }
|
|
|
+ if (!Integer.valueOf(1).equals(robotic.getTaskStatus())) {
|
|
|
+ return R.error("任务不在执行中状态,无法追加客户");
|
|
|
+ }
|
|
|
+ if (robotic.getCompanyAiWorkflowId() == null) {
|
|
|
+ return R.error("任务未配置工作流");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. 查询当前任务已有的callees的userId集合,过滤重复
|
|
|
+ List<CompanyVoiceRoboticCallees> existingCallees = companyVoiceRoboticCalleesMapper.selectByRoboticId(taskId);
|
|
|
+ Set<Long> existingUserIds = existingCallees.stream()
|
|
|
+ .map(CompanyVoiceRoboticCallees::getUserId)
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+
|
|
|
+ // 分离重复和非重复客户
|
|
|
+ List<Long> duplicateIds = customerIds.stream()
|
|
|
+ .filter(existingUserIds::contains)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ List<Long> newCustomerIds = customerIds.stream()
|
|
|
+ .filter(id -> !existingUserIds.contains(id))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (newCustomerIds.isEmpty()) {
|
|
|
+ String dupNames = getDuplicateCustomerNames(duplicateIds);
|
|
|
+ return R.error("所选客户已存在于任务中" + (dupNames.isEmpty() ? "" : ":" + dupNames));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. 批量查询CRM客户信息
|
|
|
+ List<CrmCustomer> crmCustomers = crmCustomerService.selectCrmCustomerListByIds(
|
|
|
+ newCustomerIds.stream().map(String::valueOf).collect(Collectors.joining(",")));
|
|
|
+ Map<Long, CrmCustomer> customerMap = crmCustomers.stream()
|
|
|
+ .collect(Collectors.toMap(CrmCustomer::getCustomerId, c -> c, (a, b) -> a));
|
|
|
+
|
|
|
+ // 5. 判断是否需要加微分配
|
|
|
+ boolean hasAddWxNode = workflowHasAddWxNode(robotic.getCompanyAiWorkflowId());
|
|
|
+
|
|
|
+ int successCount = 0;
|
|
|
+ List<String> errorMessages = new ArrayList<>();
|
|
|
+
|
|
|
+ for (Long customerId : newCustomerIds) {
|
|
|
+ CrmCustomer crmCustomer = customerMap.get(customerId);
|
|
|
+ if (crmCustomer == null) {
|
|
|
+ errorMessages.add("客户不存在: " + customerId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ // 5.1 创建CompanyWxClient记录
|
|
|
+ CompanyWxClient client = new CompanyWxClient();
|
|
|
+ client.setRoboticId(taskId);
|
|
|
+ client.setCustomerId(customerId);
|
|
|
+ client.setIsWeCom(robotic.getIsWeCom());
|
|
|
+ companyWxClientServiceImpl.insertCompanyWxClient(client);
|
|
|
+
|
|
|
+ // 5.2 创建CompanyVoiceRoboticCallees记录
|
|
|
+ CompanyVoiceRoboticCallees callee = new CompanyVoiceRoboticCallees();
|
|
|
+ callee.setUserId(crmCustomer.getCustomerId());
|
|
|
+ callee.setUserName(crmCustomer.getCustomerName());
|
|
|
+ callee.setPhone(crmCustomer.getMobile());
|
|
|
+ callee.setRoboticId(robotic.getId());
|
|
|
+ callee.setResult(0);
|
|
|
+ callee.setTaskFlow(robotic.getTaskFlow());
|
|
|
+ callee.setRunTaskFlow(robotic.getRunTaskFlow());
|
|
|
+ callee.setIsWeCom(robotic.getIsWeCom());
|
|
|
+ companyVoiceRoboticCalleesService.save(callee);
|
|
|
+
|
|
|
+ // 5.3 加微分配
|
|
|
+ if (hasAddWxNode && Integer.valueOf(0).equals(robotic.getAddType())) {
|
|
|
+ allocateWx4SceneTask(robotic, client.getId());
|
|
|
+ } else if (hasAddWxNode && Integer.valueOf(1).equals(robotic.getAddType())) {
|
|
|
+ String intention = crmCustomer.getIntention();
|
|
|
+ String queryIntention = intention;
|
|
|
+ if (!isPositiveInteger(intention)) {
|
|
|
+ List<SysDictData> customerIntentionLevel = sysDictTypeService.selectDictDataByType("customer_intention_level");
|
|
|
+ Optional<SysDictData> firstDict = customerIntentionLevel.stream()
|
|
|
+ .filter(e -> e.getDictLabel().equals(intention)).findFirst();
|
|
|
+ if (firstDict.isPresent()) {
|
|
|
+ queryIntention = firstDict.get().getDictValue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ List<CompanyVoiceRoboticWx> roboticWxList = companyVoiceRoboticWxMapper.selectByRoboticId(taskId, queryIntention);
|
|
|
+ List<CompanyWxAccount> accountList = new ArrayList<>(companyWxAccountService.listByIds(PubFun.listToNewList(roboticWxList, CompanyVoiceRoboticWx::getAccountId)));
|
|
|
+ Map<Long, CompanyWxAccount> accountMap = PubFun.listToMapByGroupObject(accountList, CompanyWxAccount::getId);
|
|
|
+ roboticWxList.forEach(e -> e.setAccount(accountMap.get(e.getAccountId())));
|
|
|
+ CompanyWxClient companyWxClient = companyWxClientServiceImpl.getOne(new QueryWrapper<CompanyWxClient>().eq("robotic_id", callee.getRoboticId()).eq("customer_id", callee.getUserId()));
|
|
|
+ if (companyWxClient == null) {
|
|
|
+ companyWxClient = new CompanyWxClient();
|
|
|
+ }
|
|
|
+ companyWxClient.setRoboticId(callee.getRoboticId());
|
|
|
+ companyWxClient.setNickName(callee.getUserName());
|
|
|
+ companyWxClient.setPhone(callee.getPhone());
|
|
|
+ companyWxClient.setCustomerId(callee.getUserId());
|
|
|
+ companyWxClient.setIntention(intention);
|
|
|
+ bindCompany(companyWxClient, roboticWxList);
|
|
|
+ companyWxClientServiceImpl.saveOrUpdate(companyWxClient);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5.4 创建CompanyVoiceRoboticBusiness记录
|
|
|
+ CompanyVoiceRoboticBusiness business = buildTaskBussiness4SceneTask(robotic, callee);
|
|
|
+
|
|
|
+ // 5.5 初始化工作流实例
|
|
|
+ Map<String, Object> inputVariables = new HashMap<>();
|
|
|
+ inputVariables.put("roboticId", robotic.getId());
|
|
|
+ inputVariables.put("businessId", business.getId());
|
|
|
+ inputVariables.put("cidGroupNo", robotic.getCidGroupNo());
|
|
|
+ inputVariables.put("runtimeRangeStart", robotic.getRuntimeRangeStart());
|
|
|
+ inputVariables.put("runtimeRangeEnd", robotic.getRuntimeRangeEnd());
|
|
|
+ ExecutionResult initResult = companyWorkflowEngine.initialize(robotic.getCompanyAiWorkflowId(), inputVariables);
|
|
|
+ if (!initResult.isSuccess()) {
|
|
|
+ errorMessages.add("客户" + crmCustomer.getCustomerName() + "工作流初始化失败: " + initResult.getErrorMessage());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ successCount++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("追加客户失败 - taskId: {}, customerId: {}", taskId, customerId, e);
|
|
|
+ errorMessages.add("客户" + crmCustomer.getCustomerName() + "追加失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 6. 为新增的callees生成AI标签信息
|
|
|
+ if (successCount > 0) {
|
|
|
+ try {
|
|
|
+ List<CompanyVoiceRoboticCallees> allCallees = companyVoiceRoboticCalleesMapper.selectByRoboticId(taskId);
|
|
|
+ List<CompanyWxClient> companyWxClients = companyWxClientMapper.selectListByRoboticId(taskId);
|
|
|
+ Map<String, CompanyWxClient> clientMp = companyWxClients.stream()
|
|
|
+ .collect(Collectors.toMap(e -> e.getRoboticId() + "-" + e.getCustomerId(), e -> e, (a, b) -> a));
|
|
|
+ asyncCalleeProcessorService.generateCustomerInfo(allCallees, clientMp, robotic);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("追加客户后生成AI标签信息异常 - taskId: {}", taskId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 7. 构建返回结果
|
|
|
+ Map<String, Object> resultData = new HashMap<>();
|
|
|
+ resultData.put("successCount", successCount);
|
|
|
+ if (!duplicateIds.isEmpty()) {
|
|
|
+ resultData.put("duplicateCustomerNames", getDuplicateCustomerNames(duplicateIds));
|
|
|
+ }
|
|
|
+ if (!errorMessages.isEmpty()) {
|
|
|
+ resultData.put("errorMessages", errorMessages);
|
|
|
+ }
|
|
|
+ return R.ok(resultData);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据客户ID列表获取重复客户名称
|
|
|
+ */
|
|
|
+ private String getDuplicateCustomerNames(List<Long> customerIds) {
|
|
|
+ if (customerIds == null || customerIds.isEmpty()) {
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ List<CrmCustomer> customers = crmCustomerService.selectCrmCustomerListByIds(
|
|
|
+ customerIds.stream().map(String::valueOf).collect(Collectors.joining(",")));
|
|
|
+ return customers.stream().map(CrmCustomer::getCustomerName).filter(Objects::nonNull).collect(Collectors.joining("、"));
|
|
|
+ } catch (Exception e) {
|
|
|
+ return customerIds.stream().map(String::valueOf).collect(Collectors.joining("、"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean workflowHasAddWxNode(Long workflowId) {
|
|
|
+ if (workflowId == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ List<CompanyWorkflowNode> nodes = companyWorkflowNodeMapper.selectCompanyWorkflowNodeByWorkflowId(workflowId);
|
|
|
+ if (nodes == null || nodes.isEmpty()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return nodes.stream().anyMatch(n ->
|
|
|
+ NodeTypeEnum.AI_ADD_WX_TASK_NEW.getCode().equals(n.getNodeType())
|
|
|
+ || NodeTypeEnum.AI_QW_ADD_WX_TASK.getCode().equals(n.getNodeType())
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private void pauseEasyCallTask(CompanyVoiceRobotic robotic) {
|
|
|
+ try {
|
|
|
+ CompanySiptaskInfo query = new CompanySiptaskInfo();
|
|
|
+ query.setTaskId(robotic.getId());
|
|
|
+ List<CompanySiptaskInfo> sipTasks = companySiptaskInfoMapper.selectCompanySiptaskInfoList(query);
|
|
|
+ if (sipTasks != null && !sipTasks.isEmpty()) {
|
|
|
+ for (CompanySiptaskInfo sipTask : sipTasks) {
|
|
|
+ if (sipTask.getBatchId() != null) {
|
|
|
+ easyCallTaskControlService.pauseTask(sipTask.getBatchId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("暂停EasyCall任务异常, roboticId={}", robotic.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 恢复该任务关联的所有EasyCall外呼任务
|
|
|
+ */
|
|
|
+ private void resumeEasyCallTask(CompanyVoiceRobotic robotic) {
|
|
|
+ try {
|
|
|
+ CompanySiptaskInfo query = new CompanySiptaskInfo();
|
|
|
+ query.setTaskId(robotic.getId());
|
|
|
+ List<CompanySiptaskInfo> sipTasks = companySiptaskInfoMapper.selectCompanySiptaskInfoList(query);
|
|
|
+ if (sipTasks != null && !sipTasks.isEmpty()) {
|
|
|
+ for (CompanySiptaskInfo sipTask : sipTasks) {
|
|
|
+ if (sipTask.getBatchId() != null) {
|
|
|
+ easyCallTaskControlService.resumeTask(sipTask.getBatchId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复EasyCall任务异常, roboticId={}", robotic.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步恢复暂停期间被阻塞的工作流实例
|
|
|
+ * 查询该任务下所有PAUSED(4)/WAITING(5)状态的工作流实例,分批处理
|
|
|
+ *
|
|
|
+ * @param taskId 任务ID
|
|
|
+ */
|
|
|
+ @Async("cidWorkFlowExecutor")
|
|
|
+ public void resumePausedInstances(Long taskId) {
|
|
|
+ try {
|
|
|
+ log.info("开始恢复暂停实例, taskId={}", taskId);
|
|
|
+
|
|
|
+ // 先查询该任务下所有业务记录的ID(businessKey存的是CompanyVoiceRoboticBusiness.id,而非taskId)
|
|
|
+ LambdaQueryWrapper<CompanyVoiceRoboticBusiness> bizWrapper = new LambdaQueryWrapper<>();
|
|
|
+ bizWrapper.eq(CompanyVoiceRoboticBusiness::getRoboticId, taskId)
|
|
|
+ .select(CompanyVoiceRoboticBusiness::getId);
|
|
|
+ List<CompanyVoiceRoboticBusiness> bizList = companyVoiceRoboticBusinessMapper.selectList(bizWrapper);
|
|
|
+ if (bizList == null || bizList.isEmpty()) {
|
|
|
+ log.info("未找到业务记录, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<Long> businessIds = bizList.stream().map(CompanyVoiceRoboticBusiness::getId).collect(java.util.stream.Collectors.toList());
|
|
|
+
|
|
|
+ // 查询该任务下所有PAUSED和WAITING状态的工作流实例
|
|
|
+ LambdaQueryWrapper<CompanyAiWorkflowExec> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.in(CompanyAiWorkflowExec::getBusinessKey, businessIds)
|
|
|
+ .in(CompanyAiWorkflowExec::getStatus,
|
|
|
+ ExecutionStatusEnum.PAUSED.getValue(),
|
|
|
+ ExecutionStatusEnum.WAITING.getValue(),
|
|
|
+ ExecutionStatusEnum.PENDING.getValue());
|
|
|
+ List<CompanyAiWorkflowExec> execList = companyAiWorkflowExecMapper.selectList(queryWrapper);
|
|
|
+
|
|
|
+ if (execList == null || execList.isEmpty()) {
|
|
|
+ log.info("无需恢复的暂停实例, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("找到待恢复实例 {} 个, taskId={}", execList.size(), taskId);
|
|
|
+
|
|
|
+ int batchSize = 50;
|
|
|
+ for (int i = 0; i < execList.size(); i++) {
|
|
|
+ CompanyAiWorkflowExec exec = execList.get(i);
|
|
|
+ try {
|
|
|
+ processResumeInstance(exec);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复实例异常, instanceId={}", exec.getWorkflowInstanceId(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 每处理50个暂停2秒,避免压力过大
|
|
|
+ if ((i + 1) % batchSize == 0 && i + 1 < execList.size()) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("恢复暂停实例被中断, taskId={}", taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("恢复暂停实例完成, taskId={}, count={}", taskId, execList.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("恢复暂停实例整体异常, taskId={}", taskId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 处理单个待恢复的工作流实例
|
|
|
+ */
|
|
|
+ private void processResumeInstance(CompanyAiWorkflowExec exec) {
|
|
|
+ Integer status = exec.getStatus();
|
|
|
+ Integer nodeType = exec.getCurrentNodeType();
|
|
|
+ String instanceId = exec.getWorkflowInstanceId();
|
|
|
+ String nodeKey = exec.getCurrentNodeKey();
|
|
|
+
|
|
|
+ // exec.getVariables() 存储的是 variables Map 的 JSON(不是完整 ExecutionContext)
|
|
|
+ Map<String, Object> inputData = new HashMap<>();
|
|
|
+ if (StringUtils.isNotEmpty(exec.getVariables())) {
|
|
|
+ try {
|
|
|
+ Map<String, Object> parsed = JSON.parseObject(exec.getVariables(), Map.class);
|
|
|
+ if (parsed != null) {
|
|
|
+ inputData = parsed;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("反序列化variables失败, instanceId={}", instanceId, e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // PAUSED状态处理
|
|
|
+ if (ExecutionStatusEnum.PAUSED.getValue() == status) {
|
|
|
+ // AI_CALL_TASK + PAUSED:检查是否有回调
|
|
|
+ if (NodeTypeEnum.AI_CALL_TASK.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复AI外呼PAUSED实例, instanceId={}", instanceId);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // AI_ADD_WX_TASK / AI_QW_ADD_WX_TASK + PAUSED:检查是否有回调
|
|
|
+ else if (NodeTypeEnum.AI_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复加微PAUSED实例, instanceId={}, nodeType={}", instanceId, nodeType);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // WAITING状态处理
|
|
|
+ else if (ExecutionStatusEnum.WAITING.getValue() == status) {
|
|
|
+ // 优先从 variables 中获取延时目标节点key(暂停时同步存入的)
|
|
|
+ String targetNodeKey = inputData.containsKey("_delayTargetNodeKey")
|
|
|
+ ? (String) inputData.get("_delayTargetNodeKey") : nodeKey;
|
|
|
+ // AI_CALL_TASK + WAITING:延时已过期,触发timeDoExecute
|
|
|
+ if (NodeTypeEnum.AI_CALL_TASK.getValue().equals(nodeType)) {
|
|
|
+ log.info("恢复AI外呼WAITING实例, instanceId={}, targetNodeKey={}", instanceId, targetNodeKey);
|
|
|
+ companyWorkflowEngine.timeDoExecute(instanceId, targetNodeKey, inputData);
|
|
|
+ }
|
|
|
+ // AI_ADD_WX_TASK / AI_QW_ADD_WX_TASK / AI_ADD_WX_TASK_NEW + WAITING
|
|
|
+ else if (NodeTypeEnum.AI_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue().equals(nodeType)
|
|
|
+ || NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue().equals(nodeType)) {
|
|
|
+ if (hasCallbackReceived(inputData)) {
|
|
|
+ log.info("恢复加微WAITING实例, instanceId={}, nodeType={}", instanceId, nodeType);
|
|
|
+ companyWorkflowEngine.resumeFromBlockingNode(instanceId, nodeKey, inputData);
|
|
|
+ } else {
|
|
|
+ // 无回调但有延时目标节点(加微延时场景)
|
|
|
+ if (inputData.containsKey("_delayTargetNodeKey")) {
|
|
|
+ log.info("恢复加微WAITING延时实例, instanceId={}, targetNodeKey={}", instanceId, targetNodeKey);
|
|
|
+ companyWorkflowEngine.timeDoExecute(instanceId, targetNodeKey, inputData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // PENDING状态处理:重建Redis CONTINUE:TIMER:EXECUTE key
|
|
|
+ else if (ExecutionStatusEnum.PENDING.getValue() == status) {
|
|
|
+ if (!inputData.isEmpty()) {
|
|
|
+ ExecutionContext resumeContext = new ExecutionContext();
|
|
|
+ resumeContext.setWorkflowInstanceId(instanceId);
|
|
|
+ resumeContext.setCurrentNodeKey(nodeKey);
|
|
|
+ resumeContext.setVariables(inputData);
|
|
|
+ resumeContext.setBusinessId(exec.getBusinessKey());
|
|
|
+ log.info("重建PENDING实例Redis key, instanceId={}", instanceId);
|
|
|
+ rebuildContinueTimerKey(exec, resumeContext);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查variables中是否有回调标记
|
|
|
+ */
|
|
|
+ private boolean hasCallbackReceived(Map<String, Object> variables) {
|
|
|
+ if (variables == null) return false;
|
|
|
+ Object flag = variables.get("pause_callback_received");
|
|
|
+ return Boolean.TRUE.equals(flag) || "true".equals(String.valueOf(flag));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重建CONTINUE:TIMER:EXECUTE Redis key
|
|
|
+ */
|
|
|
+ private void rebuildContinueTimerKey(CompanyAiWorkflowExec exec, ExecutionContext context) {
|
|
|
+ try {
|
|
|
+ Integer groupNo = exec.getCidGroupNo();
|
|
|
+ Date now = new Date();
|
|
|
+ int hour = now.getHours();
|
|
|
+ int minute = now.getMinutes();
|
|
|
+ String redisKey = "CONTINUE:TIMER:EXECUTE:" + groupNo + ":" + hour + ":" + minute;
|
|
|
+ String contextJson = JSON.toJSONString(context);
|
|
|
+ redisCache2.setCacheObject(redisKey + ":" + exec.getWorkflowInstanceId(), contextJson);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("重建CONTINUE:TIMER:EXECUTE Redis key异常, instanceId={}", exec.getWorkflowInstanceId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|