1
0

3 Revīzijas 9a393ba6e1 ... a52aa10818

Autors SHA1 Ziņojums Datums
  peicj a52aa10818 AI外呼调整企微加个微节点流程 1 nedēļu atpakaļ
  peicj 3442b0013a Merge branch 'master' of http://1.14.104.71:10880/root/ylrz_his_scrm_java 1 nedēļu atpakaļ
  peicj c4e9d696a3 销售端AI外呼增加企微加个微节点流程 1 nedēļu atpakaļ

+ 2 - 0
fs-service/src/main/java/com/fs/company/domain/CompanyVoiceRoboticCallees.java

@@ -62,4 +62,6 @@ public class CompanyVoiceRoboticCallees{
 
     @TableField(exist = false)
     private String idToString;
+
+    private Integer isWeCom;
 }

+ 1 - 1
fs-service/src/main/java/com/fs/company/mapper/CompanyVoiceRoboticCalleesMapper.java

@@ -84,6 +84,6 @@ public interface CompanyVoiceRoboticCalleesMapper extends BaseMapper<CompanyVoic
 
     List<SendMsgByTaskVO> getSendMsgTaskListByRoboticId(@Param("roboticId") Long roboticId);
 
-    List<CompanyVoiceRoboticCallees> selectExcludeList(@Param("list")List<CompanyWxClient> list);
+    List<CompanyVoiceRoboticCallees> selectExcludeList(@Param("list")List<CompanyWxClient> list,@Param("isWeCom") Integer isWeCom);
     List<Long> getNotFinishAddWxRobotic(@Param("roboticIds") Set<Long> roboticIds);
 }

+ 1 - 0
fs-service/src/main/java/com/fs/company/mapper/CompanyWxClientMapper.java

@@ -84,4 +84,5 @@ public interface CompanyWxClientMapper extends BaseMapper<CompanyWxClient> {
 
     List<CompanyWxClient> getQwAddWxList(@Param("accountIdList") List<Long> accountIdList, @Param("isWeCom") Integer isWeCom);
 
+    List<CompanyWxClient4WorkFlowVO> getQwAddWxList4Workflow(@Param("accountIdList") List<Long> accountIdList, @Param("execStatus") Integer execStatus, @Param("execNodeType") Integer execNodeType);
 }

+ 2 - 0
fs-service/src/main/java/com/fs/company/service/ICompanyWxClientService.java

@@ -74,4 +74,6 @@ public interface ICompanyWxClientService extends IService<CompanyWxClient> {
     List<CompanyWxClient4WorkFlowVO> getAddWxList4Workflow(List<Long> accountIdList);
 
     List<CompanyWxClient> getQwAddWxList(List<Long> accountIdList,Integer isWeCom);
+
+    List<CompanyWxClient4WorkFlowVO> getQwAddWxList4Workflow(List<Long> accountIdList);
 }

+ 1 - 0
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -185,6 +185,7 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
             callees.setResult(0);
             callees.setTaskFlow(companyVoiceRobotic.getTaskFlow());
             callees.setRunTaskFlow(companyVoiceRobotic.getRunTaskFlow());
+            callees.setIsWeCom(isWeCom);
             return callees;
         }).collect(Collectors.toList());
         companyVoiceRoboticCalleesService.saveBatch(callesList);

+ 10 - 0
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxClientServiceImpl.java

@@ -246,4 +246,14 @@ public class CompanyWxClientServiceImpl extends ServiceImpl<CompanyWxClientMappe
     public List<CompanyWxClient> getQwAddWxList(List<Long> accountIdList, Integer isWeCom) {
         return baseMapper.getQwAddWxList(accountIdList,isWeCom);
     }
+
+    /**
+     * 获取添加微信列表 工作流用
+     * @param accountIdList
+     * @return
+     */
+    @Override
+    public  List<CompanyWxClient4WorkFlowVO> getQwAddWxList4Workflow(List<Long> accountIdList){
+        return baseMapper.getQwAddWxList4Workflow(accountIdList, ExecutionStatusEnum.WAITING.getValue(), NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue());
+    }
 }

+ 46 - 40
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiQwAddWxTaskNode.java

@@ -1,5 +1,6 @@
 package com.fs.company.service.impl.call.node;
 
+import cn.hutool.core.collection.CollectionUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.constant.Constants;
 import com.fs.common.core.redis.RedisCacheT;
@@ -65,10 +66,14 @@ public class AiQwAddWxTaskNode extends AbstractWorkflowNode {
         boolean addSuccess = wxClient != null && Integer.valueOf(1).equals(wxClient.getIsAdd());
         //回调加微成功
         if (addSuccess) {
-            List<CompanyWorkflowEdge> cList = edges.stream().filter(a ->
-                            StringUtils.isNotBlank(a.getConditionExpr()) && JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class).get(0).isAdd())
-                    .collect(Collectors.toList());
-            if(!cList.isEmpty() && nodeKey.equals(exec.getCurrentNodeKey())){
+            List<CompanyWorkflowEdge> cList = edges.stream().filter(a -> {
+                if (StringUtils.isBlank(a.getConditionExpr())) {
+                    return false;
+                }
+                List<AiCallWorkflowConditionVo> list = JSONObject.parseArray(a.getConditionExpr(), AiCallWorkflowConditionVo.class);
+                return list != null && !list.isEmpty() && list.get(0).isAdd();
+            }).collect(Collectors.toList());
+            if (!cList.isEmpty() && nodeKey.equals(exec.getCurrentNodeKey())) {
                 super.runNextNode(context, cList.get(0));
             }
         }
@@ -78,7 +83,8 @@ public class AiQwAddWxTaskNode extends AbstractWorkflowNode {
                     .collect(Collectors.toList());
             // 加微失败,根据条件判断走哪条边
             CompanyWorkflowEdge edge = cList.get(0);
-                AiCallWorkflowConditionVo condition = JSONObject.parseObject(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+            List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+             AiCallWorkflowConditionVo condition = conditions.get(0);
                 // 匹配失败条件
                 if (!condition.isAdd()) {
                     log.info("加微失败,执行失败分支 - workflowInstanceId: {}", context.getWorkflowInstanceId());
@@ -104,8 +110,8 @@ public class AiQwAddWxTaskNode extends AbstractWorkflowNode {
             return ExecutionResult.failure().nextNodeKey(null).build();
         }
         try {
-            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.PAUSED);
-            return ExecutionResult.paused()
+            super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
+            return ExecutionResult.waiting()
                     .outputData(context.getVariables())
                     .nextNodeKey("").build();
         } catch (Exception e) {
@@ -184,49 +190,49 @@ public class AiQwAddWxTaskNode extends AbstractWorkflowNode {
      * getRedisCacheKey
      *
      */
-    public static String getDelayAddWxKeyPrefix(Long time) {
+    public static String getDelayAddWxKeyPrefix(Integer cidGroupNo,Long time) {
         Date nowDay;
         if (null != time) {
             nowDay = new Date(time);
         }else{
             nowDay = new Date();
         }
-        return String.format(DELAY_QW_ADD_WX_KEY, nowDay.getHours(), nowDay.getMinutes());
+        return String.format(DELAY_QW_ADD_WX_KEY,cidGroupNo, nowDay.getHours(), nowDay.getMinutes());
     }
 
     /**
      * 完成加微动作
      *
      */
-//    public void doneQwAddWx(String workflowInstanceId) {
-//        ExecutionContext context = createExecutionContext(workflowInstanceId, nodeKey);
-//        context.setVariable("lastNodeKey", nodeKey);
-//        //启动定时节点倒计时
-//        CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
-//        if (!exec.getCurrentNodeKey().equals(nodeKey)) {
-//            //当前节点已流转
-//            log.error("当前节点已流转 ,目标:{},实际:{}", nodeKey, exec.getCurrentNodeKey());
-//            return;
-//        }
-//        //更新加微日志执行状态
-//        super.updateLogStatusIfExist(context, ExecutionStatusEnum.PAUSED, ExecutionStatusEnum.WAITING);
-//        super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), nodeKey, context, ExecutionStatusEnum.WAITING);
-//        List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
-//        edges.forEach(edge -> {
-//            List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
-//            if (null == conditions || conditions.isEmpty()) {
-//                super.runNextNode(context, edge);
-//            } else {
-//                AiCallWorkflowConditionVo condition = conditions.get(0);
-//                //节点包含延时条件
-//                if (null != condition.getAddTime() && !condition.isAdd()) {
-//                    long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
-//                    String redisKey = getDelayAddWxKeyPrefix(l) + workflowInstanceId;
-//                    ExecutionContext nextContext = context.clone();
-//                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
-//                    super.redisCache.setCacheObject(redisKey, nextContext);
-//                }
-//            }
-//        });
-//    }
+    public void doneQwAddWx(String workflowInstanceId) {
+        ExecutionContext context = createExecutionContext(workflowInstanceId, nodeKey);
+        context.setVariable("lastNodeKey", nodeKey);
+        //启动定时节点倒计时
+        CompanyAiWorkflowExec exec = companyAiWorkflowExecMapper.selectByWorkflowInstanceId(context.getWorkflowInstanceId());
+        if (!exec.getCurrentNodeKey().equals(nodeKey)) {
+            //当前节点已流转
+            log.error("当前节点已流转 ,目标:{},实际:{}", nodeKey, exec.getCurrentNodeKey());
+            return;
+        }
+        //更新加微日志执行状态
+        super.updateLogStatusIfExist(context, ExecutionStatusEnum.WAITING, ExecutionStatusEnum.WAITING);
+        super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), nodeKey, context, ExecutionStatusEnum.WAITING);
+        List<CompanyWorkflowEdge> edges = companyWorkflowEdgeMapper.selectListByWorkflowIdAndNodeKey(exec.getWorkflowId(), nodeKey);
+        edges.forEach(edge -> {
+            List<AiCallWorkflowConditionVo> conditions = JSONObject.parseArray(edge.getConditionExpr(), AiCallWorkflowConditionVo.class);
+            if (CollectionUtil.isEmpty(conditions)) {
+                super.runNextNode(context, edge);
+            } else {
+                AiCallWorkflowConditionVo condition = conditions.get(0);
+                //节点包含延时条件
+                if (null != condition.getAddTime() && !condition.isAdd()) {
+                    long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
+                    String redisKey = getDelayAddWxKeyPrefix(exec.getCidGroupNo(),l) + workflowInstanceId;
+                    ExecutionContext nextContext = context.clone();
+                    nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
+                    super.redisCache.setCacheObject(redisKey, nextContext);
+                }
+            }
+        });
+    }
 }

+ 1 - 1
fs-service/src/main/resources/mapper/company/CompanyVoiceRoboticCallLogAddwxMapper.xml

@@ -66,7 +66,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="createTime != null">#{createTime},</if>
             <if test="companyId != null">#{companyId},</if>
             <if test="wxAccountId != null">#{wxAccountId},</if>
-            <if test="isWeCom != null">#{is_we_com},</if>
+            <if test="isWeCom != null">#{isWeCom},</if>
          </trim>
     </insert>
 

+ 3 - 0
fs-service/src/main/resources/mapper/company/CompanyVoiceRoboticCalleesMapper.xml

@@ -175,6 +175,9 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 
     <select id="selectExcludeList" resultType="com.fs.company.domain.CompanyVoiceRoboticCallees" >
         SELECT * FROM  company_voice_robotic_callees where 1=1
+        <if test="isWeCom != null and isWeCom != ''">
+            and is_we_com = #{isWeCom}
+        </if>
         <if test="list != null">
             and
             <foreach item="item" collection="list" separator=" or " open="(" close=")">

+ 12 - 0
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -188,6 +188,18 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         </if>
         group by t1.account_id
     </select>
+    <select id="getQwAddWxList4Workflow" resultType="com.fs.company.vo.CompanyWxClient4WorkFlowVO">
+
+        SELECT t1.*,t3.workflow_instance_id,t3.current_node_key,t3.current_node_name,t3.current_node_type FROM company_wx_client t1
+        inner join company_voice_robotic_business t2 on t1.id = t2.wx_client_id and t1.robotic_id = t2.robotic_id
+        inner join company_ai_workflow_exec t3 on t3.business_key = t2.id
+        where t1.is_add = 0 and t1.account_id is not null and t1.is_we_com = 2
+        and t3.current_node_type = #{execNodeType} And t3.status = #{execStatus}
+        <if test="accountIdList != null and !accountIdList.isEmpty()">
+            and t1.account_id in <foreach collection="accountIdList" open="(" separator="," close=")" item="item">#{item}</foreach>
+        </if>
+        group by t1.account_id
+    </select>
     <select id="selectWx" resultType="com.fs.company.domain.CompanyWxClient">
         select * from company_wx_client where account_id = #{accountId} and wx_v3 = #{v3}
     </select>

+ 36 - 10
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -1,14 +1,11 @@
 package com.fs.app.service;
 
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.RandomUtil;
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.fs.common.constant.Constants;
-import com.fs.common.constant.FsConstants;
 import com.fs.common.core.redis.RedisCache;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.PubFun;
@@ -19,7 +16,7 @@ import com.fs.company.param.ExecutionContext;
 import com.fs.company.service.*;
 import com.fs.company.service.impl.*;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
-import com.fs.company.service.impl.call.node.AiCallTaskNode;
+import com.fs.company.service.impl.call.node.AiQwAddWxTaskNode;
 import com.fs.company.service.impl.call.node.WorkflowNodeFactory;
 import com.fs.company.vo.CompanyWxClient4WorkFlowVO;
 import com.fs.course.config.RedisKeyScanner;
@@ -39,8 +36,6 @@ import com.fs.wxcid.dto.friend.AddContactParam;
 import com.fs.wxcid.service.FriendService;
 import com.fs.wxcid.vo.AddContactVo;
 import com.fs.wxwork.dto.WxAddSearchDTO;
-import com.fs.wxwork.dto.WxSearchContactDTO;
-import com.fs.wxwork.dto.WxSearchContactResp;
 import com.fs.wxwork.dto.WxWorkResponseDTO;
 import com.fs.wxwork.service.WxWorkService;
 import lombok.AllArgsConstructor;
@@ -53,7 +48,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
@@ -108,6 +102,7 @@ public class WxTaskService {
     private final QwUserMapper qwUserMapper;
     private final WxWorkService wxWorkService;
     private final QwExternalContactMapper qwExternalContactMapper;
+    private final CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
 
     public void addWx(List<Long> accountIdList) {
         log.info("==========执行加微信任务开始==========");
@@ -116,7 +111,7 @@ public class WxTaskService {
         // 需要添加微信的列表
         List<CompanyWxClient> list = companyWxClientService.getAddWxList(accountIdList,1);
         //排除掉没到达加微步骤的人
-        List<CompanyVoiceRoboticCallees> exList = companyVoiceRoboticCalleesMapper.selectExcludeList(list);
+        List<CompanyVoiceRoboticCallees> exList = companyVoiceRoboticCalleesMapper.selectExcludeList(list, 1);
         List<CompanyVoiceRoboticCallees> collect =
                 exList.stream().filter(e -> !Constants.ADD_WX.equals(getNextTaskOptimized(e.getTaskFlow(), e.getRunTaskFlow())))
                         .collect(Collectors.toList());
@@ -930,7 +925,38 @@ public class WxTaskService {
         log.info("==========执行企微申请加个微结果查询任务结束==========");
     }
 
-    
+
+    /**
+     * 扫描企微加微工作流延时任务
+     */
+    public void cidWorkflowQwAddWxRun() {
+        log.info("===========企微加微工作流延时任务开始扫描===========");
+        String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
+        Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
+        log.info("企微加微共扫描到 {} 个待处理键", keys.size());
+        keys.parallelStream().forEach(key -> {
+            try {
+                //doExec
+                CompletableFuture.runAsync(()->{
+                    try {
+                        ExecutionContext context = redisCache2.getCacheObject(key);
+                        context.setVariable("callRedisKey",key);
+                        context.setVariable("callSource","qwAddWxTimer");
+                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
+                    } catch (Exception e) {
+                        log.error("处理工作流延时任务异常 - key: {}", key, e);
+                    }
+                }, cidExcutor).thenRun(()->{
+                    redisCache2.deleteObject(key);
+                });
+
+            } catch (Exception ex) {
+                log.error("处理工作流延时任务异常 - key: {}", key, ex);
+            }
+        });
+        log.info("===========工作流延时任务扫描结束===========");
+    }
+
     /**
      * 获取过滤后的企微客户列表
      */
@@ -938,7 +964,7 @@ public class WxTaskService {
         List<CompanyWxClient> list = companyWxClientService.getAddWxList(accountIdList, 2);
         
         // 排除掉没到达加微步骤的人
-        List<CompanyVoiceRoboticCallees> excludeList = companyVoiceRoboticCalleesMapper.selectExcludeList(list);
+        List<CompanyVoiceRoboticCallees> excludeList = companyVoiceRoboticCalleesMapper.selectExcludeList(list,2);
         Set<String> excludeKeys = excludeList.stream()
                 .filter(e -> !Constants.QW_ADD_WX.equals(getNextTaskOptimized(e.getTaskFlow(), e.getRunTaskFlow())))
                 .map(callee -> callee.getRoboticId() + "_" + callee.getUserId())

+ 8 - 2
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -6,8 +6,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
-
 /**
  * 企业微信SOP定时任务管理类
  * 负责处理各种定时任务,包括SOP规则检查、消息发送、数据清理等
@@ -78,4 +76,12 @@ public class WxTask {
     public void qwAddWxResult() {
         taskService.qwAddWxResult(null);
     }
+    /**
+     * 企微加微工作流超时检测
+     * 每分钟执行一次,检查是否有加微超时的工作流需要继续执行
+     */
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void cidWorkflowQwAddWxRun(){
+        taskService.cidWorkflowQwAddWxRun();
+    }
 }