lmx преди 3 седмици
родител
ревизия
87feae2c6a

+ 1 - 1
fs-ai-call-task/src/main/java/com/fs/FsAiCallTaskApplication.java

@@ -13,7 +13,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 @EnableTransactionManagement
 @EnableAsync
-@EnableScheduling
+//@EnableScheduling
 public class FsAiCallTaskApplication
 {
     public static void main(String[] args){

+ 9 - 0
fs-ai-call-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -1,8 +1,10 @@
 package com.fs.app.controller;
 
 
+import com.fs.app.service.CallTaskService;
 import com.fs.common.core.domain.R;
 import io.swagger.annotations.Api;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -12,12 +14,19 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping(value="/app/common")
 @Slf4j
+@AllArgsConstructor
 public class CommonController {
 
 
+    private final CallTaskService taskService;
+
     @GetMapping("/test")
     public R test(){
         return R.ok();
     }
 
+    @GetMapping("cidWorkflowCallRun")
+    public void cidWorkflowCallRun(){
+        taskService.cidWorkflowCallRun();
+    }
 }

+ 67 - 0
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -0,0 +1,67 @@
+package com.fs.app.service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fs.common.core.redis.RedisCache;
+import com.fs.common.core.redis.RedisCacheT;
+import com.fs.company.param.ExecutionContext;
+import com.fs.company.service.*;
+import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.course.config.RedisKeyScanner;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+@Slf4j
+@Service
+@AllArgsConstructor
+public class CallTaskService {
+
+    private final RedisCacheT<String> redisCache;
+
+    private final RedisCache redisCache2;
+    private final CompanyWorkflowEngine companyWorkflowEngine;
+    private final ExecutorService cidExcutor = new ThreadPoolExecutor(
+            32,
+            64,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(1000),
+            r -> new Thread(r, "callPool-" + System.currentTimeMillis()),
+            new ThreadPoolExecutor.CallerRunsPolicy()
+    );
+    private final RedisKeyScanner redisKeyScanner;
+
+    /**
+     * 扫描工作流延时任务
+     */
+    public void cidWorkflowCallRun() {
+        log.info("===========工作流延时任务开始扫描===========");
+        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
+        Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
+        log.info("共扫描到 {} 个待处理键", keys.size());
+        HashMap commonMp = new HashMap();
+        commonMp.put("callSource","timer");
+        keys.parallelStream().forEach(key -> {
+            try {
+                //doExec
+                CompletableFuture.runAsync(()->{
+                    try {
+                        commonMp.put("callRedisKey",key);
+                        ExecutionContext context = redisCache2.getCacheObject(key);
+                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                    } catch (Exception e) {
+                        log.error("处理工作流延时任务异常 - key: {}", key, e);
+                    }
+                }, cidExcutor);
+
+            } catch (Exception ex) {
+                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+            }
+        });
+        log.info("===========工作流延时任务扫描结束===========");
+    }
+}

+ 8 - 0
fs-ai-call-task/src/main/java/com/fs/app/task/Task.java

@@ -1,6 +1,7 @@
 package com.fs.app.task;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fs.app.service.CallTaskService;
 import com.fs.company.domain.CompanyVoiceRobotic;
 import com.fs.company.mapper.CompanyVoiceRoboticMapper;
 import lombok.AllArgsConstructor;
@@ -24,6 +25,7 @@ public class Task {
 
     private final CompanyVoiceRoboticMapper roboticMapper;
 
+    private CallTaskService taskService;
     /**
      * 定时拉人进群
      */
@@ -34,4 +36,10 @@ public class Task {
 
         });
     }
+
+
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void cidWorkflowRun(){
+        taskService.cidWorkflowCallRun();
+    }
 }

+ 16 - 0
fs-ai-call-task/src/main/resources/application.yml

@@ -0,0 +1,16 @@
+# 开发环境配置
+server:
+  # 服务器的HTTP端口,默认为8080
+  port: 7005
+logging:
+  level:
+    org: INFO
+    com: DEBUG
+# Spring配置
+spring:
+  profiles:
+    active: dev
+#    active: druid-hcl
+#    active: druid-sxjz
+#    active: druid-hdt
+#    active: druid-myhk-test

+ 2 - 2
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -474,7 +474,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 //            }
         } catch (Exception e) {
             log.error("唤醒阻塞节点失败: {} -> {}", workflowInstanceId, nodeKey, e);
-            updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
+//            updateWorkflowStatus(workflowInstanceId, ExecutionStatusEnum.FAILURE);
             throw new CustomException("Resume from blocking node failed: " + e.getMessage());
         }
     }
@@ -502,7 +502,7 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
             }
             String lastNodeKey =(String) inputData.get("lastNodeKey");
 
-            // 验证当前节点是否匹配
+            // 验证节点是否匹配
             if (!lastNodeKey.equals(execNode)) {
                 log.error("当前流程已扭转节点不匹配 - 期望: {}, 实际: {}", lastNodeKey, execNode);
                 throw new CustomException("节点不匹配,期望: " + lastNodeKey + ", 实际: " + execNode);

+ 34 - 13
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -49,6 +49,7 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
 
     @Override
     public ExecutionResult execute(ExecutionContext context) {
+        log.info("开始执行节点:" + nodeName + " - " + nodeKey);
         // 记录执行开始时间
         long startTime = System.currentTimeMillis();
         ExecutionResult result = null;
@@ -76,8 +77,9 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
         try {
             CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
             log.info("收到继续执行请求 - workflowInstanceId: {}, nodeKey: {}, 当前状态: {}",
-                    context.getWorkflowInstanceId(), context.getCurrentNodeKey(), companyAiWorkflowExec.getStatus());
-            
+                    context.getWorkflowInstanceId(), nodeKey, companyAiWorkflowExec.getStatus());
+            CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
+            context.setVariable("currentNode", node);
             // 允许 PAUSED 或 WAITING 状态继续执行
             if (!Integer.valueOf(ExecutionStatusEnum.PAUSED.getValue()).equals(companyAiWorkflowExec.getStatus())
                     && !Integer.valueOf(ExecutionStatusEnum.WAITING.getValue()).equals(companyAiWorkflowExec.getStatus())) {
@@ -93,10 +95,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             //更新流程日志信息
             CompanyAiWorkflowExecLog companyAiWorkflowExecLog = new CompanyAiWorkflowExecLog();
             companyAiWorkflowExecLog.setWorkflowInstanceId(context.getWorkflowInstanceId());
-            companyAiWorkflowExecLog.setNodeKey(context.getCurrentNodeKey());
+            companyAiWorkflowExecLog.setNodeKey(nodeKey);
             companyAiWorkflowExecLog.setStatus(ExecutionStatusEnum.PAUSED.getValue());
             List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(companyAiWorkflowExecLog);
-            if(null != companyAiWorkflowExecLogs){
+            if (null != companyAiWorkflowExecLogs && !companyAiWorkflowExecLogs.isEmpty()) {
                 CompanyAiWorkflowExecLog fExecLog = companyAiWorkflowExecLogs.get(0);
                 fExecLog.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
                 fExecLog.setEndTime(new Date());
@@ -116,8 +118,22 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
      */
     protected void preExecute(ExecutionContext context) {
         context.setVariable("node_start_time_" + nodeKey, System.currentTimeMillis());
-        context.setVariable("lastNodeKey",nodeKey);
+        context.setVariable("lastNodeKey", nodeKey);
         log.info("Starting execution of node: {} ({})", nodeKey, nodeName);
+        context.setCurrentNodeKey(nodeKey);
+        CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
+        context.setVariable("currentNode", node);
+        CompanyAiWorkflowExec companyAiWorkflowExec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
+        if (!companyAiWorkflowExec.getCurrentNodeKey().equals(nodeKey)) {
+            CompanyWorkflowNode cNode = companyWorkflowNodeMapper.selectNodeByNodeKey(nodeKey);
+            CompanyAiWorkflowExec update = new CompanyAiWorkflowExec();
+            update.setCurrentNodeKey(nodeKey);
+            update.setCurrentNodeName(nodeName);
+            update.setCurrentNodeType(NodeTypeEnum.fromCode(cNode.getNodeType()).getValue());
+            update.setLastUpdateTime(LocalDateTime.now());
+            companyAiWorkflowExecMapper.updateCompanyAiWorkflowExec(update);
+        }
+
     }
 
     /**
@@ -129,10 +145,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
         log.info("Completed execution of node: {} ({})", nodeKey, nodeName);
         //todo 写入执行日志等后置操作
         int logStatus;
-        if (ExecutionStatusEnum.SUCCESS.equals(result.getStatus()) ) {
+        if (ExecutionStatusEnum.SUCCESS.equals(result.getStatus())) {
             logStatus = ExecutionStatusEnum.SUCCESS.getValue();
             updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.SUCCESS);
-        } else if(ExecutionStatusEnum.FAILURE.equals(result.getStatus())) {
+        } else if (ExecutionStatusEnum.FAILURE.equals(result.getStatus())) {
             logStatus = ExecutionStatusEnum.FAILURE.getValue();
             updateWorkflowStatus(context.getWorkflowInstanceId(), ExecutionStatusEnum.FAILURE);
         } else {
@@ -281,6 +297,10 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             update.setStatus(status.getValue());
             update.setLastUpdateTime(LocalDateTime.now());
             update.setVariables(objectMapper.writeValueAsString(context.getVariables()));
+            update.setCurrentNodeKey(nodeKey);
+            CompanyWorkflowNode currentNode = context.getVariable("currentNode", CompanyWorkflowNode.class);
+            update.setCurrentNodeType(NodeTypeEnum.fromCode(currentNode.getNodeType()).getValue());
+            update.setCurrentNodeName(currentNode.getNodeName());
             companyAiWorkflowExecMapper.updateByWorkflowInstanceId(update);
             log.info("工作流已阻塞在节点: {} -> {}", workflowInstanceId, nodeKey);
         } catch (Exception e) {
@@ -322,8 +342,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             CompanyAiWorkflowExec update = new CompanyAiWorkflowExec();
             update.setWorkflowInstanceId(context.getWorkflowInstanceId());
             update.setCurrentNodeKey(context.getCurrentNodeKey());
-            if(null  != context.getVariables() && null != context.getVariable("nodeName",String.class)){
-                update.setCurrentNodeName(context.getVariable("nodeName",String.class));
+            if (null != context.getVariables() && null != context.getVariable("nodeName", String.class)) {
+                update.setCurrentNodeName(context.getVariable("nodeName", String.class));
             }
             update.setStatus(ExecutionStatusEnum.RUNNING.getValue());
             update.setLastUpdateTime(LocalDateTime.now());
@@ -335,14 +355,15 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
 
     }
 
-    protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge){
-        if(StringUtils.isBlank(edge.getTargetNodeKey())){
+    protected void runNextNode(ExecutionContext context, CompanyWorkflowEdge edge) {
+        if (StringUtils.isBlank(edge.getTargetNodeKey())) {
             return;
         }
         ExecutionContext nextContext = context.clone();
-        nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
-        execPointNextNode(nextContext);
         CompanyWorkflowNode nextNode = companyWorkflowNodeMapper.selectNodeByNodeKey(edge.getTargetNodeKey());
+        nextContext.setCurrentNodeKey(nextNode.getNodeKey());
+        nextContext.setVariable("nodeName", nextNode.getNodeName());
+        execPointNextNode(nextContext);
         IWorkflowNode node = workflowNodeFactory.createNode(nextNode.getNodeKey(), NodeTypeEnum.fromCode(nextNode.getNodeType()), nextNode.getNodeName(), null);
         node.execute(nextContext);
     }

+ 19 - 4
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -3,6 +3,7 @@ package com.fs.company.service.impl.call.node;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.aicall.domain.result.CalltaskcreateaiCustomizeResult;
+import com.fs.common.utils.StringUtils;
 import com.fs.common.utils.spring.SpringUtils;
 import com.fs.company.domain.*;
 import com.fs.company.mapper.CompanyWorkflowNodeMapper;
@@ -21,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author MixLiu
@@ -46,6 +48,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
      */
     @Override
     protected ExecutionResult doContinue(ExecutionContext context) {
+        //TODO 多次回调 是否过滤处理
         CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
         List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
         //获取外呼回调结果日志
@@ -75,7 +78,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                     ExecutionContext nextContext = context.clone();
                     nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                     //添加到延时扫描redis
-                    super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext);
+                    super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext,1, TimeUnit.DAYS);
                     super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
                 }
                 //无时间驱动
@@ -83,7 +86,6 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                     super.runNextNode(context, edge);
                 }
             }
-
         });
         return null;
     }
@@ -91,12 +93,14 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
     @Override
     protected ExecutionResult doExecute(ExecutionContext context) {
         if (isAsync()) {
-            //当前节点信息
-            CompanyWorkflowNode node = companyWorkflowNodeMapper.selectNodeByNodeKey(context.getCurrentNodeKey());
+            CompanyWorkflowNode node = context.getVariable("currentNode", CompanyWorkflowNode.class);
             String nodeConfig = node.getNodeConfig();
             AiCallConfigVO callConfigVo = JSONObject.parseObject(nodeConfig, AiCallConfigVO.class);
             //执行外呼逻辑 需要传入节点信息
             CompanyVoiceRoboticBusiness bus = super.getRoboticBusiness(context.getWorkflowInstanceId());
+            if (bus == null) {
+                return ExecutionResult.failure().errorMessage("未找到业务数据").build();
+            }
             companyVoiceRoboticService.workflowCallPhoneOne(bus.getRoboticId(), bus.getCalleeId(), context, callConfigVo);
             super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
             return ExecutionResult.paused()
@@ -133,6 +137,17 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
         return String.format(DELAY_CALL_KEY, nowDay.getHours(), nowDay.getMinutes());
     }
 
+    @Override
+    protected void postExecute(ExecutionContext context, ExecutionResult result){
+        super.postExecute(context, result);
+        String callRedisKey = context.getVariable("callRedisKey", String.class);
+        //来源于定时调用doexec,调用后移除key
+        if(StringUtils.isNotBlank(callRedisKey)){
+            super.redisCache.deleteObject(callRedisKey);
+        }
+
+    }
+
 
 //    @Override
 //    public String getNextNodeKey(String workflowInstanceId, String nodeKey) {

+ 3 - 3
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiSendMsgTaskNode.java

@@ -80,10 +80,10 @@ public class AiSendMsgTaskNode extends AbstractWorkflowNode {
             
             if (edges != null && !edges.isEmpty()) {
                 // 直接执行下一个节点
-                this.runNextNode(context, edges.get(0));
-                return ExecutionResult.success().nextNodeKey("").build();
+                super.runNextNode(context, edges.get(0));
+                return ExecutionResult.success().nextNodeKey(null).build();
             } else {
-                log.warn("没有找到下一个节点 - workflowInstanceId: {}", context.getWorkflowInstanceId());
+                log.error("没有找到下一个节点 - workflowInstanceId: {}", context.getWorkflowInstanceId());
                 return ExecutionResult.success().nextNodeKey("").build();
             }
             

+ 1 - 4
fs-wx-task/src/main/java/com/fs/app/controller/CommonController.java

@@ -50,10 +50,7 @@ public class CommonController {
     public void callNextTask(){
         taskService.callNextTask();
     }
-    @GetMapping("cidWorkflowCallRun")
-    public void cidWorkflowCallRun(){
-        taskService.cidWorkflowCallRun();
-    }
+
 
 
 

+ 0 - 25
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -706,29 +706,4 @@ public class WxTaskService {
         }
     }
 
-    /**
-     * 扫描工作流延时任务
-     */
-    public void cidWorkflowCallRun() {
-        log.info("===========工作流延时任务开始扫描===========");
-        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
-        Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
-        log.info("共扫描到 {} 个待处理键", keys.size());
-        HashMap commonMp = new HashMap();
-        commonMp.put("callSource","timer");
-        keys.parallelStream().forEach(key -> {
-            try {
-                //doExec
-                CompletableFuture.runAsync(()->{
-                    String cacheObject = redisCache.getCacheObject(key);
-                    ExecutionContext context = JSONObject.parseObject(cacheObject, ExecutionContext.class);
-                    companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
-                }, cidExcutor);
-
-            } catch (Exception ex) {
-                log.error("处理工作流延时任务异常 - key: {}", key, ex);
-            }
-        });
-        log.info("===========工作流延时任务扫描结束===========");
-    }
 }

+ 0 - 4
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -51,8 +51,4 @@ public class WxTask {
         taskService.checkWorkflowAddWxTimeout();
     }
 
-    @Scheduled(cron = "0 0/1 * * * ?")
-    public void cidWorkflowRun(){
-        taskService.cidWorkflowCallRun();
-    }
 }