lmx 3 týždňov pred
rodič
commit
5569d9fe8d

+ 2 - 1
fs-service/src/main/java/com/fs/company/service/ICompanyVoiceRoboticService.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
 import com.fs.aicall.domain.apiresult.PushIIntentionResult;
 import com.fs.aicall.domain.result.CalltaskcreateaiCustomizeResult;
 import com.fs.company.domain.CompanyVoiceRobotic;
+import com.fs.company.param.ExecutionContext;
 import com.fs.company.vo.AddWxClientVo;
 import com.fs.company.vo.AiCallConfigVO;
 import com.fs.company.vo.CompanyVoiceRoboticQwUserListVo;
@@ -44,7 +45,7 @@ public interface ICompanyVoiceRoboticService extends IService<CompanyVoiceRoboti
     public int insertCompanyVoiceRobotic(CompanyVoiceRobotic companyVoiceRobotic);
     CalltaskcreateaiCustomizeResult addTask(CompanyVoiceRobotic companyVoiceRobotic);
     CalltaskcreateaiCustomizeResult callPhoneOne(Long roboticId,Long callerId);
-    CalltaskcreateaiCustomizeResult workflowCallPhoneOne(Long roboticId,Long callerId,String workflowInstanceId, AiCallConfigVO callConfigVo);
+    CalltaskcreateaiCustomizeResult workflowCallPhoneOne(Long roboticId, Long callerId, ExecutionContext context, AiCallConfigVO callConfigVo);
     void sendMsgOne(Long roboticId,Long callerId);
 
     /**

+ 26 - 5
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -20,6 +20,7 @@ import com.fs.common.service.impl.SmsServiceImpl;
 import com.fs.common.utils.*;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.*;
+import com.fs.company.param.ExecutionContext;
 import com.fs.company.service.*;
 import com.fs.company.vo.*;
 import com.fs.crm.domain.CrmCustomer;
@@ -29,13 +30,17 @@ import com.fs.crm.service.impl.CrmCustomerServiceImpl;
 import com.fs.system.mapper.SysDictDataMapper;
 import com.fs.system.service.ISysConfigService;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.Synchronized;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
 
@@ -47,7 +52,8 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 @Service
-@AllArgsConstructor
+//@AllArgsConstructor
+@RequiredArgsConstructor
 public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRoboticMapper, CompanyVoiceRobotic> implements ICompanyVoiceRoboticService {
 
     private static final String NODE_RUN_KEY = "aicall:node:run:";
@@ -89,6 +95,10 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
 
     private final CompanyVoiceRoboticBusinessMapper companyVoiceRoboticBusinessMapper;
     private final CompanyWorkflowEngine companyWorkflowEngine;
+
+    @Autowired
+    @Qualifier("workFlowExecutor")
+    Executor workFlowExecutor;
     /**
      * 查询机器人外呼任务
      *
@@ -307,14 +317,13 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
      * @return
      */
     @Override
-    public CalltaskcreateaiCustomizeResult workflowCallPhoneOne(Long roboticId, Long callerId, String workflowInstanceId,AiCallConfigVO callConfigVo) {
+    public CalltaskcreateaiCustomizeResult workflowCallPhoneOne(Long roboticId, Long callerId, ExecutionContext context, AiCallConfigVO callConfigVo) {
         try {
             CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectById(roboticId);
             CompanyVoiceRoboticCallees callees = companyVoiceRoboticCalleesMapper.selectById(callerId);
             CalleeDomain build = CalleeDomain.builder().number(callees.getPhone()).userData(callees.getId().toString()).build();
             List<CalleeDomain> mobileList = new ArrayList<>();
             mobileList.add(build);
-            String callBackUuid = UUID.randomUUID().toString();
             // 构建三方接口请求数据
             CalltaskcreateaiCustomizeDomain param = new CalltaskcreateaiCustomizeDomain();
             param.setRobot(callConfigVo.getRobot());
@@ -334,9 +343,11 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             if (StringUtils.isNotEmpty(robotic.getEndTime1())) {
                 param.setEndTime1(robotic.getEndTime1() + ":00");
             }
+            String callBackUuid = UUID.randomUUID().toString();
             JSONObject userDataJson=new JSONObject();
             userDataJson.put("callBackUuid",callBackUuid);
-            userDataJson.put("workflowInstanceId",workflowInstanceId);
+            userDataJson.put("nodeKey",context.getCurrentNodeKey());
+            userDataJson.put("workflowInstanceId",context.getWorkflowInstanceId());
             param.setUserData(userDataJson.toJSONString());
             JSONObject runParam = (JSONObject) JSON.toJSON(param);
             runParam.put("companyId", robotic.getCompanyId());
@@ -373,7 +384,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             return result;
 
         } catch (Exception ex) {
-            log.error("callPhoneOne异常:", ex);
+            log.error("workflowcallPhoneOne异常:", ex);
         }
         return null;
 
@@ -623,6 +634,16 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
         CompanyVoiceRoboticCallees callee = getResultCalleeInfo(notify);
         //更新调用日志
         companyVoiceRoboticCallLogCallphoneService.asyncHandleCalleeCallBackResult(result,callee);
+        if(StringUtils.isNotBlank(notify.getUserData())){
+            JSONObject userData = JSONObject.parseObject(notify.getUserData());
+            if(userData.containsKey("callBackUuid") && userData.containsKey("workflowInstanceId") && userData.containsKey("nodeKey")){
+                Map<String, Object> param = new HashMap<>();
+                param.put("callBackUuid",userData.getString("callBackUuid"));
+                CompletableFuture.runAsync(()->{
+                    companyWorkflowEngine.resumeFromBlockingNode(userData.getString("workflowInstanceId"),userData.getString("nodeKey"),param);
+                },workFlowExecutor);
+            }
+        }
         long count = companyVoiceRoboticCalleesMapper.countByRoboticIdNotUuid(callee.getRoboticId());
         if(count == 0){
 //            new Thread(() -> dispenseWx(callee.getRoboticId())).start();

+ 3 - 2
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -286,6 +286,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 //        currentExec.setCurrentNodeId(startNodeId);
             currentExec.setCurrentNodeKey(startNodeKey);
             currentExec.setCurrentNodeType(NodeTypeEnum.START.getValue());
+            currentExec.setCurrentNodeName(NodeTypeEnum.START.getDescription());
             currentExec.setStatus(ExecutionStatusEnum.RUNNING.getValue());
             currentExec.setStartTime(LocalDateTime.now());
             currentExec.setVariables(objectMapper.writeValueAsString(context.getVariables()));
@@ -387,7 +388,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
         context.setCurrentNodeKey(record.getCurrentNodeKey());
         context.setStartTime(record.getStartTime());
         context.setCurrentTime(LocalDateTime.now());
-
+        context.setBusinessId(record.getBusinessKey());
         try {
             if (StringUtils.isNotBlank(record.getVariables())) {
                 Map<String, Object> variables = objectMapper.readValue(
@@ -437,7 +438,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             }
 
             // 检查当前工作流是否处于暂停状态
-            if (!Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(currentExec.getStatus())) {
+            if (!Integer.valueOf(ExecutionStatusEnum.PAUSED.getValue()).equals(currentExec.getStatus())) {
                 throw new CustomException("工作流未处于暂停状态,无法唤醒: " + workflowInstanceId);
             }
 

+ 9 - 0
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -79,6 +79,13 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             return doContinue(context);
         } catch (Exception e) {
             return handleExecutionError(e, context);
+        } finally {
+            //更新流程日志信息
+            CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
+            companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
+            companyAiWorkflowExecLog.setNodeKey(context.getCurrentNodeKey());
+            companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
+            companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog)
         }
     }
 
@@ -188,6 +195,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             CompanyAiWorkflowExecLog logRecord = new CompanyAiWorkflowExecLog();
             logRecord.setWorkflowInstanceId(logEntry.getWorkflowInstanceId());
             logRecord.setNodeId(logEntry.getNodeId());
+            logRecord.setNodeKey(logEntry.getNodeKey());
+            logRecord.setNodeName(logEntry.getNodeName());
             logRecord.setNodeType(logEntry.getNodeType());
             logRecord.setStatus(logEntry.getStatus());
             logRecord.setInputData(objectMapper.writeValueAsString(logEntry.getInputData()));

+ 23 - 23
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -86,7 +86,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
             AiCallConfigVO callConfigVo = JSONObject.parseObject(nodeConfig, AiCallConfigVO.class);
             //执行外呼逻辑 需要传入节点信息
             CompanyVoiceRoboticBusiness bus = super.getRoboticBusiness(context.getWorkflowInstanceId());
-            companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context.getWorkflowInstanceId(), callConfigVo);
+            companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context, callConfigVo);
             super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
             return ExecutionResult.paused()
                     .nextNodeKey("").build();
@@ -108,28 +108,28 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
     }
 
 
-    @Override
-    public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
-
-        CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
-
-        List<CompanyWorkflowEdge> companyWorkflowEdges =
-                companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(companyAiWorkflowExec.getWorkflowId(), nodeKey);
-        //多线路
-        if (null != companyWorkflowEdges && !companyWorkflowEdges.isEmpty()) {
-            if (companyWorkflowEdges.size() > 1) {
-                for (CompanyWorkflowEdge edge : companyWorkflowEdges) {
-                    AiCallWorkflowConditionVo aiCallWorkflowConditionVo = JSON.parseObject(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
-                    //todo
-
-                }
-            } else {
-                return companyWorkflowEdges.get(0).getTargetNodeKey();
-            }
-        }
-        //没有满足的节点
-        return null;
-    }
+//    @Override
+//    public String getNextNodeKey(String workflowInstanceId, String nodeKey) {
+//
+//        CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(workflowInstanceId);
+//
+//        List<CompanyWorkflowEdge> companyWorkflowEdges =
+//                companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(companyAiWorkflowExec.getWorkflowId(), nodeKey);
+//        //多线路
+//        if (null != companyWorkflowEdges && !companyWorkflowEdges.isEmpty()) {
+//            if (companyWorkflowEdges.size() > 1) {
+//                for (CompanyWorkflowEdge edge : companyWorkflowEdges) {
+//                    AiCallWorkflowConditionVo aiCallWorkflowConditionVo = JSON.parseObject(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+//                    //todo
+//
+//                }
+//            } else {
+//                return companyWorkflowEdges.get(0).getTargetNodeKey();
+//            }
+//        }
+//        //没有满足的节点
+//        return null;
+//    }
 
     /**
      * 运行下一个节点

+ 27 - 0
fs-service/src/main/java/com/fs/wxcid/threadExecutor/workFlowExecutor.java

@@ -0,0 +1,27 @@
+package com.fs.wxcid.threadExecutor;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+@EnableAsync
+public class workFlowExecutor {
+
+    @Bean("workFlowExecutor")
+    public Executor workFlowExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(8);
+        executor.setMaxPoolSize(16);
+        executor.setQueueCapacity(2000);
+        executor.setThreadNamePrefix("WorkfLow-");
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.setKeepAliveSeconds(60);
+        executor.initialize();
+        return executor;
+    }
+}