浏览代码

call-task wxtask 新增分组,优化cid执行流程

lmx 1 周之前
父节点
当前提交
9a393ba6e1

+ 8 - 2
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -8,8 +8,10 @@ import com.fs.company.service.*;
 import com.fs.company.service.impl.call.node.AiCallTaskNode;
 import com.fs.course.config.RedisKeyScanner;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -17,9 +19,13 @@ import java.util.concurrent.*;
 
 @Slf4j
 @Service
-@AllArgsConstructor
+@RequiredArgsConstructor
 public class CallTaskService {
 
+
+    @Value("${cid-group-no}")
+    private Integer cidGroupNo;
+
     private final RedisCacheT<String> redisCache;
 
     private final RedisCache redisCache2;
@@ -40,7 +46,7 @@ public class CallTaskService {
      */
     public void cidWorkflowCallRun() {
         log.info("===========工作流延时任务开始扫描===========");
-        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
+        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(cidGroupNo,null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
         keys.parallelStream().forEach(key -> {

+ 1 - 0
fs-ai-call-task/src/main/resources/application.yml

@@ -14,3 +14,4 @@ spring:
 #    active: druid-sxjz
 #    active: druid-hdt
 #    active: druid-myhk-test
+cid-group-no: 1

+ 3 - 0
fs-company/src/main/java/com/fs/company/controller/crm/CrmCustomerController.java

@@ -98,6 +98,9 @@ public class CrmCustomerController extends BaseController
     @PreAuthorize("@ss.hasPermi('crm:customer:list')")
     @GetMapping("/listAll")
     public R listAll(CrmCustomerListQueryParam crmCustomer){
+
+        LoginUser loginUser = tokenService.getLoginUser(ServletUtils.getRequest());
+        crmCustomer.setCompanyId(loginUser.getCompany().getCompanyId());
         PageHelper.startPage(1, 1000);
         if(!StringUtils.isEmpty(crmCustomer.getReceiveTimeRange())){
             crmCustomer.setReceiveTimeList(crmCustomer.getReceiveTimeRange().split("--"));

+ 2 - 1
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -1093,7 +1093,8 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
 
     // 绑定销售
     private void bindCompany(CompanyWxClient client, List<CompanyVoiceRoboticWx> wxList) {
-        List<CompanyVoiceRoboticWx> wx = wxList.stream().filter(f -> f.getAccount() != null && f.getAccount().getAllocateNum() < f.getAccount().getAddNum()).collect(Collectors.toList());
+//         取消再分配时 最大加微限制判定 && f.getAccount().getAllocateNum() < f.getAccount().getAddNum()
+        List<CompanyVoiceRoboticWx> wx = wxList.stream().filter(f -> f.getAccount() != null).collect(Collectors.toList());
         // 绑定销售,添加值达到阈值后设置为空,等待下次绑定
         if (!wx.isEmpty()) {
             CompanyVoiceRoboticWx companyVoiceRoboticWx = wx.get(0);

+ 12 - 7
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxServiceImpl.java

@@ -8,10 +8,7 @@ import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.DateUtils;
 import com.fs.company.domain.*;
-import com.fs.company.mapper.CompanyAiWorkflowExecLogMapper;
-import com.fs.company.mapper.CompanyAiWorkflowExecMapper;
-import com.fs.company.mapper.CompanyWxAccountMapper;
-import com.fs.company.mapper.CompanyWxClientMapper;
+import com.fs.company.mapper.*;
 import com.fs.company.service.*;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
 import com.fs.enums.ExecutionStatusEnum;
@@ -101,6 +98,8 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
     @Autowired
     private CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
 
+    @Autowired
+    CompanyAiWorkflowServerMapper companyAiWorkflowServerMapper;
 
 
     /**
@@ -256,7 +255,13 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
         if (addressId==null || addressId.isEmpty()){
             return R.error("请先绑定地址");
         }
-        Long serverId = cidIpadServerService.selectQwIpadServerByAddressId(addressId);
+        Long cidServerId = companyUser.getCidServerId();
+        if ( cidServerId==null ){
+            return R.error("请先绑定cid服务");
+        }
+        CompanyAiWorkflowServer cidServer = companyAiWorkflowServerMapper.selectCompanyAiWorkflowServerById(companyUser.getCidServerId());
+
+        Long serverId = cidIpadServerService.selectQwIpadServerByAddressId(addressId,cidServer.getGroupNo());
         if (serverId==null){
             return  R.error(501,"该地区服务器剩余数量不足");
         }
@@ -264,7 +269,7 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
         account.setServerStatus(1);
         updateById(account);
 
-        cidIpadServerService.subtractServer(serverId);
+//        cidIpadServerService.subtractServer(serverId);
         CidIpadServerUser qwIpadServerUser = new CidIpadServerUser();
         qwIpadServerUser.setCompanyUserId(companyUser.getUserId());
         qwIpadServerUser.setCompanyId(companyUser.getCompanyId());
@@ -608,7 +613,7 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
             }
 
             // 清除超时检测Key(回调成功了,不需要超时检测了)
-            AiAddWxTaskNode.clearTimeoutKey(workflowInstanceId, wxClientId);
+//            AiAddWxTaskNode.clearTimeoutKey(workflowInstanceId, wxClientId);
 
             // 触发工作流继续执行
             Map<String, Object> inputData = new HashMap<>();

+ 15 - 15
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -35,7 +35,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     @SuppressWarnings("unchecked")
     private static final RedisCacheT<String> redisCache = SpringUtils.getBean(RedisCacheT.class);
-    public static final String DELAY_ADD_WX_KEY = "addWxTask:delay:%s:%s:";
+    public static final String DELAY_ADD_WX_KEY = "addWxTask:delay:%s:%s:%s:";
     /**
      * 默认加微超时时间(分钟)
      */
@@ -225,17 +225,17 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         return true;
     }
 
-    /**
-     * 清除超时检测 Key
-     *
-     * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId         加微客户ID
-     */
-    public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
-        String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
-        redisCache.deleteObject(timeoutKey);
-        log.info("清除加微超时检测 Key: {}", timeoutKey);
-    }
+//    /**
+//     * 清除超时检测 Key
+//     *
+//     * @param workflowInstanceId 工作流实例ID
+//     * @param wxClientId         加微客户ID
+//     */
+//    public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
+//        String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
+//        redisCache.deleteObject(timeoutKey);
+//        log.info("清除加微超时检测 Key: {}", timeoutKey);
+//    }
 
     /**
      * getRedisCacheKey
@@ -243,12 +243,12 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * @param time
      * @return
      */
-    public static String getDelayAddWxKeyPrefix(Long time) {
+    public static String getDelayAddWxKeyPrefix(Integer cidGroupNo,Long time) {
         Date nowDay = new Date();
         if (null != time) {
             nowDay = new Date(time);
         }
-        return String.format(DELAY_ADD_WX_KEY, nowDay.getHours(), nowDay.getMinutes());
+        return String.format(DELAY_ADD_WX_KEY,cidGroupNo, nowDay.getHours(), nowDay.getMinutes());
     }
 
     /**
@@ -287,7 +287,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
                 //节点包含延时条件
                 if (null != condition.getAddTime() && !condition.isAdd()) {
                     long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
-                    String redisKey = getDelayAddWxKeyPrefix(l) + workflowInstanceId;
+                    String redisKey = getDelayAddWxKeyPrefix(exec.getCidGroupNo(),l) + workflowInstanceId;
                     ExecutionContext nextContext = context.clone();
                     nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                     super.redisCache.setCacheObject(redisKey, nextContext);

+ 5 - 5
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 public class AiCallTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     private static final ICompanyVoiceRoboticService companyVoiceRoboticService = SpringUtils.getBean(ICompanyVoiceRoboticService.class);
-    public static final String DELAY_CALL_KEY = "aiCallTask:delay:%s:%s:";
+    public static final String DELAY_CALL_KEY = "aiCallTask:delay:%s:%s:%s:";
     private final String CALL_FROM_CALLBACK = "callBack";
     private final String CALL_FROM_TIMER = "timer";
 
@@ -110,7 +110,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                         ExecutionContext nextContext = context.clone();
                         nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                         //添加到延时扫描redis
-                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
+                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(exec.getCidGroupNo(),l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
                         super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
                         updateLogStatusIfExist(context, ExecutionStatusEnum.PAUSED, ExecutionStatusEnum.WAITING);
                         runnableCount++;
@@ -187,16 +187,16 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
 
     /**
      * getRedisCacheKey
-     *
+     * @param cidGroupNo
      * @param time
      * @return
      */
-    public static String getDelayCallKeyPrefix(Long time) {
+    public static String getDelayCallKeyPrefix(Integer cidGroupNo,Long time) {
         Date nowDay = new Date();
         if (null != time) {
             nowDay = new Date(time);
         }
-        return String.format(DELAY_CALL_KEY, nowDay.getHours(), nowDay.getMinutes());
+        return String.format(DELAY_CALL_KEY, cidGroupNo,nowDay.getHours(), nowDay.getMinutes());
     }
 
 //    @Override

+ 1 - 1
fs-service/src/main/java/com/fs/company/service/impl/call/node/WorkflowNodeFactory.java

@@ -3,6 +3,7 @@ package com.fs.company.service.impl.call.node;
 import com.fs.company.service.IWorkflowNode;
 import com.fs.company.service.IWorkflowNodeFactory;
 import com.fs.enums.NodeTypeEnum;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
@@ -15,7 +16,6 @@ import java.util.Map;
 @Component
 public class WorkflowNodeFactory implements IWorkflowNodeFactory {
 
-
     @Override
     public IWorkflowNode createNode(String nodeKey, NodeTypeEnum type, String nodeName,
                                     Map<String, Object> properties) {

+ 1 - 1
fs-service/src/main/java/com/fs/wxcid/service/ICidIpadServerService.java

@@ -60,7 +60,7 @@ public interface ICidIpadServerService extends IService<CidIpadServer>{
      */
     int deleteCidIpadServerById(Long id);
 
-    Long selectQwIpadServerByAddressId(String addressId);
+    Long selectQwIpadServerByAddressId(String addressId,Integer cidGroupNo);
 
     void subtractServer(Long serverId);
 }

+ 2 - 2
fs-service/src/main/java/com/fs/wxcid/service/impl/CidIpadServerServiceImpl.java

@@ -96,8 +96,8 @@ public class CidIpadServerServiceImpl extends ServiceImpl<CidIpadServerMapper, C
     }
 
     @Override
-    public Long selectQwIpadServerByAddressId(String addressId) {
-        CidIpadServer ipadServer = getOne(new QueryWrapper<CidIpadServer>().eq("address_id", addressId).last("limit 1"));
+    public Long selectQwIpadServerByAddressId(String addressId,Integer cidGroupNo) {
+        CidIpadServer ipadServer = getOne(new QueryWrapper<CidIpadServer>().eq("address_id", addressId).eq("group_no", cidGroupNo).last("limit 1"));
         if(ipadServer == null){
             throw new CustomException("地区PAD不足");
         }

+ 1 - 0
fs-service/src/main/resources/application-dev.yml

@@ -44,6 +44,7 @@ spring:
                 # 主库数据源
                 master:
                     url: jdbc:mysql://139.186.77.83:3306/ylrz_his_scrm?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
+#                    url: jdbc:mysql://139.186.77.83:3306/ylrz_his_scrm_hetai?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
                     username: Rtroot
                     password: Rtroot
                 # 主库数据源

+ 2 - 0
fs-service/src/main/resources/mapper/company/CompanyWxAccountMapper.xml

@@ -56,6 +56,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="phone != null">phone,</if>
             <if test="wxNo != null">wx_no,</if>
             <if test="companyUserId != null">company_user_id,</if>
+            <if test="companyId != null">company_id,</if>
             <if test="createTime != null">create_time,</if>
             <if test="createUser != null">create_user,</if>
          </trim>
@@ -64,6 +65,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="phone != null">#{phone},</if>
             <if test="wxNo != null">#{wxNo},</if>
             <if test="companyUserId != null">#{companyUserId},</if>
+            <if test="companyId != null">#{companyId},</if>
             <if test="createTime != null">#{createTime},</if>
             <if test="createUser != null">#{createUser},</if>
          </trim>

+ 2 - 1
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -56,6 +56,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         inner join company_voice_robotic robotic on a.robotic_id = robotic.id
         <where>
             <if test="companyId != null"> and b.company_id = #{companyId}</if>
+            <if test="isWeCom != null"> and a.is_we_com = #{isWeCom}</if>
             <if test="roboticId != null "> and a.robotic_id = #{roboticId}</if>
             <if test="roboticWxId != null "> and b.id = #{roboticWxId}</if>
             <if test="customerId != null "> and a.customer_id = #{customerId}</if>
@@ -180,7 +181,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         SELECT t1.*,t3.workflow_instance_id,t3.current_node_key,t3.current_node_name,t3.current_node_type FROM company_wx_client t1
                              inner join company_voice_robotic_business t2 on t1.id = t2.wx_client_id and t1.robotic_id = t2.robotic_id
                              inner join company_ai_workflow_exec t3 on t3.business_key = t2.id
-        where t1.is_add = 0 and t1.account_id is not null
+        where t1.is_add = 0 and t1.account_id is not null and t1.is_we_com = 1
         and t3.current_node_type = #{execNodeType} And t3.status = #{execStatus}
         <if test="accountIdList != null and !accountIdList.isEmpty()">
             and t1.account_id in <foreach collection="accountIdList" open="(" separator="," close=")" item="item">#{item}</foreach>

+ 9 - 3
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -44,11 +44,13 @@ import com.fs.wxwork.dto.WxSearchContactResp;
 import com.fs.wxwork.dto.WxWorkResponseDTO;
 import com.fs.wxwork.service.WxWorkService;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -62,9 +64,13 @@ import java.util.stream.Collectors;
 
 @Slf4j
 @Service
-@AllArgsConstructor
+@RequiredArgsConstructor
 public class WxTaskService {
 
+
+    @Value("${cid-group-no:0}")
+    private Integer cidGroupNo;
+
     private final ICompanyWxAccountService companyWxAccountService;
     private final ISysConfigService sysConfigService;
     private final ICompanyWxClientService companyWxClientService;
@@ -295,7 +301,7 @@ public class WxTaskService {
                     if (vo.isSuccess()) {
                         e.setLastAddWxTime(LocalDateTime.now());
 //                        todo 删除还原 以下为测试所用
-//                        e.setLastAddWxTime(LocalDateTime.now().plus(-1, ChronoUnit.DAYS));
+                        e.setLastAddWxTime(LocalDateTime.now().plus(-1, ChronoUnit.DAYS));
                         e.setIsAddNum(e.getIsAddNum() + 1);
                         client.setIsAdd(2);
                         client.setAddTime(LocalDateTime.now());
@@ -826,7 +832,7 @@ public class WxTaskService {
      */
     public void cidWorkflowAddWxRun() {
         log.info("===========工作流延时任务开始扫描===========");
-        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(null) + "*";
+        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
         keys.parallelStream().forEach(key -> {

+ 8 - 8
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -22,10 +22,10 @@ public class WxTask {
     @Autowired
     private WxTaskService taskService;
 
-    @Scheduled(cron = "0 0/30 * * * ?")
-    public void addWx() {
-        taskService.addWx(null);
-    }
+//    @Scheduled(cron = "0 0/30 * * * ?")
+//    public void addWx() {
+//        taskService.addWx(null);
+//    }
     @Scheduled(cron = "0 0/1 * * * ?")
     public void addWx4Workflow() {
         taskService.addWx4Workflow(null);
@@ -53,10 +53,10 @@ public class WxTask {
      * 工作流加微超时检测
      * 每分钟执行一次,检查是否有加微超时的工作流需要继续执行
      */
-    @Scheduled(cron = "0 0/1 * * * ?")
-    public void checkWorkflowAddWxTimeout(){
-        taskService.checkWorkflowAddWxTimeout();
-    }
+//    @Scheduled(cron = "0 0/1 * * * ?")
+//    public void checkWorkflowAddWxTimeout(){
+//        taskService.checkWorkflowAddWxTimeout();
+//    }
 
     @Scheduled(cron = "0 0/1 * * * ?")
     public void cidWorkflowAddWxRun(){

+ 1 - 0
fs-wx-task/src/main/resources/application.yml

@@ -14,3 +14,4 @@ spring:
 #    active: druid-sxjz
 #    active: druid-hdt
 #    active: druid-myhk-test
+cid-group-no: 1