Преглед на файлове

Merge branch 'master' of http://1.14.104.71:10880/root/ylrz_his_scrm_java

# Conflicts:
#	fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml
#	fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java
peicj преди 1 седмица
родител
ревизия
3442b0013a

+ 8 - 2
fs-ai-call-task/src/main/java/com/fs/app/service/CallTaskService.java

@@ -8,8 +8,10 @@ import com.fs.company.service.*;
 import com.fs.company.service.impl.call.node.AiCallTaskNode;
 import com.fs.course.config.RedisKeyScanner;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -17,9 +19,13 @@ import java.util.concurrent.*;
 
 @Slf4j
 @Service
-@AllArgsConstructor
+@RequiredArgsConstructor
 public class CallTaskService {
 
+
+    @Value("${cid-group-no}")
+    private Integer cidGroupNo;
+
     private final RedisCacheT<String> redisCache;
 
     private final RedisCache redisCache2;
@@ -40,7 +46,7 @@ public class CallTaskService {
      */
     public void cidWorkflowCallRun() {
         log.info("===========工作流延时任务开始扫描===========");
-        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
+        String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(cidGroupNo,null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
         keys.parallelStream().forEach(key -> {

+ 1 - 0
fs-ai-call-task/src/main/resources/application.yml

@@ -14,3 +14,4 @@ spring:
 #    active: druid-sxjz
 #    active: druid-hdt
 #    active: druid-myhk-test
+cid-group-no: 1

+ 3 - 0
fs-company/src/main/java/com/fs/company/controller/crm/CrmCustomerController.java

@@ -98,6 +98,9 @@ public class CrmCustomerController extends BaseController
     @PreAuthorize("@ss.hasPermi('crm:customer:list')")
     @GetMapping("/listAll")
     public R listAll(CrmCustomerListQueryParam crmCustomer){
+
+        LoginUser loginUser = tokenService.getLoginUser(ServletUtils.getRequest());
+        crmCustomer.setCompanyId(loginUser.getCompany().getCompanyId());
         PageHelper.startPage(1, 1000);
         if(!StringUtils.isEmpty(crmCustomer.getReceiveTimeRange())){
             crmCustomer.setReceiveTimeList(crmCustomer.getReceiveTimeRange().split("--"));

+ 2 - 1
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -1094,7 +1094,8 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
 
     // 绑定销售
     private void bindCompany(CompanyWxClient client, List<CompanyVoiceRoboticWx> wxList) {
-        List<CompanyVoiceRoboticWx> wx = wxList.stream().filter(f -> f.getAccount() != null && f.getAccount().getAllocateNum() < f.getAccount().getAddNum()).collect(Collectors.toList());
+//         取消再分配时 最大加微限制判定 && f.getAccount().getAllocateNum() < f.getAccount().getAddNum()
+        List<CompanyVoiceRoboticWx> wx = wxList.stream().filter(f -> f.getAccount() != null).collect(Collectors.toList());
         // 绑定销售,添加值达到阈值后设置为空,等待下次绑定
         if (!wx.isEmpty()) {
             CompanyVoiceRoboticWx companyVoiceRoboticWx = wx.get(0);

+ 12 - 7
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxServiceImpl.java

@@ -8,10 +8,7 @@ import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.DateUtils;
 import com.fs.company.domain.*;
-import com.fs.company.mapper.CompanyAiWorkflowExecLogMapper;
-import com.fs.company.mapper.CompanyAiWorkflowExecMapper;
-import com.fs.company.mapper.CompanyWxAccountMapper;
-import com.fs.company.mapper.CompanyWxClientMapper;
+import com.fs.company.mapper.*;
 import com.fs.company.service.*;
 import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
 import com.fs.enums.ExecutionStatusEnum;
@@ -101,6 +98,8 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
     @Autowired
     private CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
 
+    @Autowired
+    CompanyAiWorkflowServerMapper companyAiWorkflowServerMapper;
 
 
     /**
@@ -256,7 +255,13 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
         if (addressId==null || addressId.isEmpty()){
             return R.error("请先绑定地址");
         }
-        Long serverId = cidIpadServerService.selectQwIpadServerByAddressId(addressId);
+        Long cidServerId = companyUser.getCidServerId();
+        if ( cidServerId==null ){
+            return R.error("请先绑定cid服务");
+        }
+        CompanyAiWorkflowServer cidServer = companyAiWorkflowServerMapper.selectCompanyAiWorkflowServerById(companyUser.getCidServerId());
+
+        Long serverId = cidIpadServerService.selectQwIpadServerByAddressId(addressId,cidServer.getGroupNo());
         if (serverId==null){
             return  R.error(501,"该地区服务器剩余数量不足");
         }
@@ -264,7 +269,7 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
         account.setServerStatus(1);
         updateById(account);
 
-        cidIpadServerService.subtractServer(serverId);
+//        cidIpadServerService.subtractServer(serverId);
         CidIpadServerUser qwIpadServerUser = new CidIpadServerUser();
         qwIpadServerUser.setCompanyUserId(companyUser.getUserId());
         qwIpadServerUser.setCompanyId(companyUser.getCompanyId());
@@ -608,7 +613,7 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
             }
 
             // 清除超时检测Key(回调成功了,不需要超时检测了)
-            AiAddWxTaskNode.clearTimeoutKey(workflowInstanceId, wxClientId);
+//            AiAddWxTaskNode.clearTimeoutKey(workflowInstanceId, wxClientId);
 
             // 触发工作流继续执行
             Map<String, Object> inputData = new HashMap<>();

+ 15 - 15
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNode.java

@@ -35,7 +35,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     @SuppressWarnings("unchecked")
     private static final RedisCacheT<String> redisCache = SpringUtils.getBean(RedisCacheT.class);
-    public static final String DELAY_ADD_WX_KEY = "addWxTask:delay:%s:%s:";
+    public static final String DELAY_ADD_WX_KEY = "addWxTask:delay:%s:%s:%s:";
     /**
      * 默认加微超时时间(分钟)
      */
@@ -225,17 +225,17 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
         return true;
     }
 
-    /**
-     * 清除超时检测 Key
-     *
-     * @param workflowInstanceId 工作流实例ID
-     * @param wxClientId         加微客户ID
-     */
-    public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
-        String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
-        redisCache.deleteObject(timeoutKey);
-        log.info("清除加微超时检测 Key: {}", timeoutKey);
-    }
+//    /**
+//     * 清除超时检测 Key
+//     *
+//     * @param workflowInstanceId 工作流实例ID
+//     * @param wxClientId         加微客户ID
+//     */
+//    public static void clearTimeoutKey(String workflowInstanceId, Long wxClientId) {
+//        String timeoutKey = Constants.WORKFLOW_ADD_WX_TIMEOUT + workflowInstanceId + ":" + wxClientId;
+//        redisCache.deleteObject(timeoutKey);
+//        log.info("清除加微超时检测 Key: {}", timeoutKey);
+//    }
 
     /**
      * getRedisCacheKey
@@ -243,12 +243,12 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
      * @param time
      * @return
      */
-    public static String getDelayAddWxKeyPrefix(Long time) {
+    public static String getDelayAddWxKeyPrefix(Integer cidGroupNo,Long time) {
         Date nowDay = new Date();
         if (null != time) {
             nowDay = new Date(time);
         }
-        return String.format(DELAY_ADD_WX_KEY, nowDay.getHours(), nowDay.getMinutes());
+        return String.format(DELAY_ADD_WX_KEY,cidGroupNo, nowDay.getHours(), nowDay.getMinutes());
     }
 
     /**
@@ -287,7 +287,7 @@ public class AiAddWxTaskNode extends AbstractWorkflowNode {
                 //节点包含延时条件
                 if (null != condition.getAddTime() && !condition.isAdd()) {
                     long l = System.currentTimeMillis() + condition.getAddTime() * 60 * 1000;
-                    String redisKey = getDelayAddWxKeyPrefix(l) + workflowInstanceId;
+                    String redisKey = getDelayAddWxKeyPrefix(exec.getCidGroupNo(),l) + workflowInstanceId;
                     ExecutionContext nextContext = context.clone();
                     nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                     super.redisCache.setCacheObject(redisKey, nextContext);

+ 5 - 5
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiCallTaskNode.java

@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 public class AiCallTaskNode extends AbstractWorkflowNode {
     private static final CompanyWorkflowNodeMapper companyWorkflowNodeMapper = SpringUtils.getBean(CompanyWorkflowNodeMapper.class);
     private static final ICompanyVoiceRoboticService companyVoiceRoboticService = SpringUtils.getBean(ICompanyVoiceRoboticService.class);
-    public static final String DELAY_CALL_KEY = "aiCallTask:delay:%s:%s:";
+    public static final String DELAY_CALL_KEY = "aiCallTask:delay:%s:%s:%s:";
     private final String CALL_FROM_CALLBACK = "callBack";
     private final String CALL_FROM_TIMER = "timer";
 
@@ -110,7 +110,7 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
                         ExecutionContext nextContext = context.clone();
                         nextContext.setCurrentNodeKey(edge.getTargetNodeKey());
                         //添加到延时扫描redis
-                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
+                        super.redisCache.setCacheObject(this.getDelayCallKeyPrefix(exec.getCidGroupNo(),l) + exec.getWorkflowInstanceId(), nextContext, 1, TimeUnit.DAYS);
                         super.asyncWorkflowForBlockingNode(context.getWorkflowInstanceId(), context.getCurrentNodeKey(), context, ExecutionStatusEnum.WAITING);
                         updateLogStatusIfExist(context, ExecutionStatusEnum.PAUSED, ExecutionStatusEnum.WAITING);
                         runnableCount++;
@@ -187,16 +187,16 @@ public class AiCallTaskNode extends AbstractWorkflowNode {
 
     /**
      * getRedisCacheKey
-     *
+     * @param cidGroupNo
      * @param time
      * @return
      */
-    public static String getDelayCallKeyPrefix(Long time) {
+    public static String getDelayCallKeyPrefix(Integer cidGroupNo,Long time) {
         Date nowDay = new Date();
         if (null != time) {
             nowDay = new Date(time);
         }
-        return String.format(DELAY_CALL_KEY, nowDay.getHours(), nowDay.getMinutes());
+        return String.format(DELAY_CALL_KEY, cidGroupNo,nowDay.getHours(), nowDay.getMinutes());
     }
 
 //    @Override

+ 1 - 1
fs-service/src/main/java/com/fs/company/service/impl/call/node/WorkflowNodeFactory.java

@@ -3,6 +3,7 @@ package com.fs.company.service.impl.call.node;
 import com.fs.company.service.IWorkflowNode;
 import com.fs.company.service.IWorkflowNodeFactory;
 import com.fs.enums.NodeTypeEnum;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
@@ -15,7 +16,6 @@ import java.util.Map;
 @Component
 public class WorkflowNodeFactory implements IWorkflowNodeFactory {
 
-
     @Override
     public IWorkflowNode createNode(String nodeKey, NodeTypeEnum type, String nodeName,
                                     Map<String, Object> properties) {

+ 1 - 1
fs-service/src/main/java/com/fs/wxcid/service/ICidIpadServerService.java

@@ -60,7 +60,7 @@ public interface ICidIpadServerService extends IService<CidIpadServer>{
      */
     int deleteCidIpadServerById(Long id);
 
-    Long selectQwIpadServerByAddressId(String addressId);
+    Long selectQwIpadServerByAddressId(String addressId,Integer cidGroupNo);
 
     void subtractServer(Long serverId);
 }

+ 2 - 2
fs-service/src/main/java/com/fs/wxcid/service/impl/CidIpadServerServiceImpl.java

@@ -96,8 +96,8 @@ public class CidIpadServerServiceImpl extends ServiceImpl<CidIpadServerMapper, C
     }
 
     @Override
-    public Long selectQwIpadServerByAddressId(String addressId) {
-        CidIpadServer ipadServer = getOne(new QueryWrapper<CidIpadServer>().eq("address_id", addressId).last("limit 1"));
+    public Long selectQwIpadServerByAddressId(String addressId,Integer cidGroupNo) {
+        CidIpadServer ipadServer = getOne(new QueryWrapper<CidIpadServer>().eq("address_id", addressId).eq("group_no", cidGroupNo).last("limit 1"));
         if(ipadServer == null){
             throw new CustomException("地区PAD不足");
         }

+ 1 - 0
fs-service/src/main/resources/application-dev.yml

@@ -44,6 +44,7 @@ spring:
                 # 主库数据源
                 master:
                     url: jdbc:mysql://139.186.77.83:3306/ylrz_his_scrm?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
+#                    url: jdbc:mysql://139.186.77.83:3306/ylrz_his_scrm_hetai?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
                     username: Rtroot
                     password: Rtroot
                 # 主库数据源

+ 2 - 0
fs-service/src/main/resources/mapper/company/CompanyWxAccountMapper.xml

@@ -56,6 +56,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="phone != null">phone,</if>
             <if test="wxNo != null">wx_no,</if>
             <if test="companyUserId != null">company_user_id,</if>
+            <if test="companyId != null">company_id,</if>
             <if test="createTime != null">create_time,</if>
             <if test="createUser != null">create_user,</if>
          </trim>
@@ -64,6 +65,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="phone != null">#{phone},</if>
             <if test="wxNo != null">#{wxNo},</if>
             <if test="companyUserId != null">#{companyUserId},</if>
+            <if test="companyId != null">#{companyId},</if>
             <if test="createTime != null">#{createTime},</if>
             <if test="createUser != null">#{createUser},</if>
          </trim>

+ 1 - 0
fs-service/src/main/resources/mapper/company/CompanyWxClientMapper.xml

@@ -56,6 +56,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         inner join company_voice_robotic robotic on a.robotic_id = robotic.id
         <where>
             <if test="companyId != null"> and b.company_id = #{companyId}</if>
+            <if test="isWeCom != null"> and a.is_we_com = #{isWeCom}</if>
             <if test="roboticId != null "> and a.robotic_id = #{roboticId}</if>
             <if test="roboticWxId != null "> and b.id = #{roboticWxId}</if>
             <if test="customerId != null "> and a.customer_id = #{customerId}</if>

+ 221 - 187
fs-wx-task/src/main/java/com/fs/app/service/WxTaskService.java

@@ -39,11 +39,14 @@ import com.fs.wxwork.dto.WxAddSearchDTO;
 import com.fs.wxwork.dto.WxWorkResponseDTO;
 import com.fs.wxwork.service.WxWorkService;
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
@@ -55,9 +58,13 @@ import java.util.stream.Collectors;
 
 @Slf4j
 @Service
-@AllArgsConstructor
+@RequiredArgsConstructor
 public class WxTaskService {
 
+
+    @Value("${cid-group-no:0}")
+    private Integer cidGroupNo;
+
     private final ICompanyWxAccountService companyWxAccountService;
     private final ISysConfigService sysConfigService;
     private final ICompanyWxClientService companyWxClientService;
@@ -289,7 +296,7 @@ public class WxTaskService {
                     if (vo.isSuccess()) {
                         e.setLastAddWxTime(LocalDateTime.now());
 //                        todo 删除还原 以下为测试所用
-//                        e.setLastAddWxTime(LocalDateTime.now().plus(-1, ChronoUnit.DAYS));
+                        e.setLastAddWxTime(LocalDateTime.now().plus(-1, ChronoUnit.DAYS));
                         e.setIsAddNum(e.getIsAddNum() + 1);
                         client.setIsAdd(2);
                         client.setAddTime(LocalDateTime.now());
@@ -820,7 +827,7 @@ public class WxTaskService {
      */
     public void cidWorkflowAddWxRun() {
         log.info("===========工作流延时任务开始扫描===========");
-        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(null) + "*";
+        String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
         Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
         log.info("共扫描到 {} 个待处理键", keys.size());
         keys.parallelStream().forEach(key -> {
@@ -846,99 +853,64 @@ public class WxTaskService {
         log.info("===========工作流延时任务扫描结束===========");
     }
 
-
     /**
      * 企微加微信任务
      *
      * @param accountIdList 企微成员id
      */
     public void qwAddWx(List<Long> accountIdList) {
-        log.info("==========执行申请企微加好友任务开始==========");
+        log.info("==========执行企微申请加个微任务开始==========");
         try {
-            // 需要添加微信的列表
-            List<CompanyWxClient4WorkFlowVO> list = companyWxClientService.getQwAddWxList4Workflow(accountIdList);
-            log.info("申请企微加好友任务需要添加微信的数量:{}", list.size());
-            if (list.isEmpty()) return;
-            List<CompanyWxClient> addList = new ArrayList<>();
-            Map<Long, CompanyWxClient4WorkFlowVO> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient4WorkFlowVO::getAccountId);
+            // 获取需要添加微信的企微客户列表
+            List<CompanyWxClient> clientList = getFilteredClientList(accountIdList);
+            if (clientList.isEmpty()) {
+                log.info("没有符合条件的客户需要添加微信");
+                return;
+            }
+            
+            // 获取CompanyWxClient信息
+            Map<Long, CompanyWxClient> clientMap = PubFun.listToMapByGroupObject(clientList, CompanyWxClient::getAccountId);
             // 获取实际企微用户信息
-            List<QwUser> addAccountList = qwUserMapper.selectBatchIds(clientMap.keySet()).stream()
+            List<QwUser> qwUserList = qwUserMapper.selectBatchIds(clientMap.keySet()).stream()
                     .filter(this::isValidQwUser)
                     .collect(Collectors.toList());
-            log.info("企微申请加好友任务需要企微的账号数量:{}", addAccountList.size());
-            addAccountList.forEach(qwUser -> {
-                CompanyWxClient4WorkFlowVO client = clientMap.get(qwUser.getId());
-                if (client != null) {
-                    CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
-                    // 开始申请加微
-                    WxWorkResponseDTO<String> resp = qwAddWxInvokeIpad(crmCustomer.getMobile(), qwUser.getUid(), qwUser.getServerId());
-                    JSONObject runParam = new JSONObject();
-                    runParam.put("qwId", qwUser.getId());
-                    runParam.put("mobile", crmCustomer.getMobile());
-                    runParam.put("qwUid", qwUser.getUid());
-                    runParam.put("clientId", client.getId());
-                    CompanyVoiceRoboticCallLogAddwx addLog = CompanyVoiceRoboticCallLogAddwx.initCallLog(
-                            runParam.toJSONString(), client.getId(), client.getRoboticId(), qwUser.getId(), qwUser.getCompanyId());
-                    if (resp != null && resp.getErrcode() == 0) {
-                        // 加微消息已发送成功
-                        client.setIsAdd(2);
-                        client.setAddTime(LocalDateTime.now());
-                        CompanyWxClient addItem = new CompanyWxClient();
-                        BeanUtils.copyProperties(client, addItem);
-                        addList.add(addItem);
-                        addLog.setStatus(1);
-                        addLog.setResult(JSON.toJSONString(resp));
-                        addLog.setIsWeCom(2);
-                        log.info("ROBOTIC-ID:{},企微申请加好友任务申请成功", client.getRoboticId());
-                    } else {
-                        log.error("ROBOTIC-ID:{},企微申请加好友任务加微失败:{}", client.getRoboticId(), runParam);
-                        addLog.setStatus(3);
-                        addLog.setResult(JSON.toJSONString(runParam));
-                    }
-                    asyncSaveCompanyVoiceRoboticCallLog(addLog);
-                } else {
-                    log.error("企微申请加好友任务当前账号暂无需要添加微信:{}-{}", qwUser.getId(), qwUser.getQwUserName());
-                }
-            });
-            if (!addList.isEmpty()) {
-                companyWxClientService.updateBatchById(addList);
-                for (CompanyWxClient client : addList) {
-                    CompanyWxClient4WorkFlowVO vo = clientMap.get(client.getAccountId());
-                    IWorkflowNode node = workflowNodeFactory.createNode(vo.getCurrentNodeKey(),
-                            NodeTypeEnum.fromValue(vo.getCurrentNodeType()),
-                            vo.getCurrentNodeName(), null);
-                    if (node instanceof AiQwAddWxTaskNode) {
-                        CompletableFuture.runAsync(() -> {
-                            AiQwAddWxTaskNode qwAddWxNode = (AiQwAddWxTaskNode) node;
-                            qwAddWxNode.doneQwAddWx(vo.getWorkflowInstanceId());
-                        }, cidExcutor);
-                    }
-                }
+            
+            log.info("需要企微添加的账号数量:{}", qwUserList.size());
+            if (qwUserList.isEmpty()) return;
+            
+            // 处理加微逻辑
+            List<CompanyWxClient> upClientList = processQwAddWx(qwUserList, clientMap);
+            
+            // 批量更新客户状态
+            if (!upClientList.isEmpty()) {
+                companyWxClientService.updateBatchById(upClientList);
+                log.info("成功更新{}个客户的加微状态", upClientList.size());
             }
         } catch (Exception e) {
-            log.error("企微申请加好友任务执行异常", e);
+            log.error("企微加微信任务执行异常", e);
         }
-        log.info("==========执行企微申请加好友任务结束==========");
+        log.info("==========执行企微申请加个微任务结束==========");
     }
 
     /**
      * 企微加微结果处理
      */
     public void qwAddWxResult(List<Long> accountIdList) {
-        log.info("==========执行企微申请加微结果查询任务开始==========");
+        log.info("==========执行企微申请加个微结果查询任务开始==========");
         try {
             //is_add = 2,状态为加微中且是企微类型
             List<CompanyWxClient> clients = companyWxClientService.getQwAddWxList(accountIdList, 2);
-            log.info("企微申请加微结果查询任务需要查询的数量:{}", clients.size());
+            log.info("需要查询企微加个微结果的数量:{}", clients.size());
             
             if (clients.isEmpty()) return;
+            
             // 处理每个客户的加微结果
             List<CompanyWxClient> upClientList = new ArrayList<>();
             clients.parallelStream().forEach(client -> {
                 try {
                     processSingleClientResult(client, upClientList);
                 } catch (Exception e) {
-                    log.error("企微申请加微结果查询任务处理客户{}加微结果异常", client.getId(), e);
+                    log.error("处理客户{}加微结果异常", client.getId(), e);
                 }
             });
             
@@ -948,43 +920,30 @@ public class WxTaskService {
             }
             
         } catch (Exception e) {
-            log.error("企微申请加微结果查询任务处理异常", e);
+            log.error("企微加微结果处理异常", e);
         }
-        log.info("==========执行企微申请加微结果查询任务结束==========");
+        log.info("==========执行企微申请加微结果查询任务结束==========");
     }
 
+    
     /**
-     * 扫描企微加微工作流延时任务
+     * 获取过滤后的企微客户列表
      */
-    public void cidWorkflowQwAddWxRun() {
-        log.info("===========企微加微工作流延时任务开始扫描===========");
-        String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(null) + "*";
-        Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
-        log.info("企微加微共扫描到 {} 个待处理键", keys.size());
-        keys.parallelStream().forEach(key -> {
-            try {
-                //doExec
-                CompletableFuture.runAsync(()->{
-                    try {
-                        ExecutionContext context = redisCache2.getCacheObject(key);
-                        context.setVariable("callRedisKey",key);
-                        context.setVariable("callSource","qwAddWxTimer");
-                        companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
-                    } catch (Exception e) {
-                        log.error("处理工作流延时任务异常 - key: {}", key, e);
-                    }
-                }, cidExcutor).thenRun(()->{
-                    redisCache2.deleteObject(key);
-                });
-
-            } catch (Exception ex) {
-                log.error("处理工作流延时任务异常 - key: {}", key, ex);
-            }
-        });
-        log.info("===========工作流延时任务扫描结束===========");
+    private List<CompanyWxClient> getFilteredClientList(List<Long> accountIdList) {
+        List<CompanyWxClient> list = companyWxClientService.getAddWxList(accountIdList, 2);
+        
+        // 排除掉没到达加微步骤的人
+        List<CompanyVoiceRoboticCallees> excludeList = companyVoiceRoboticCalleesMapper.selectExcludeList(list);
+        Set<String> excludeKeys = excludeList.stream()
+                .filter(e -> !Constants.QW_ADD_WX.equals(getNextTaskOptimized(e.getTaskFlow(), e.getRunTaskFlow())))
+                .map(callee -> callee.getRoboticId() + "_" + callee.getUserId())
+                .collect(Collectors.toSet());
+        
+        return list.stream()
+                .filter(client -> !excludeKeys.contains(client.getRoboticId() + "_" + client.getCustomerId()))
+                .collect(Collectors.toList());
     }
-
-
+    
     /**
      * 验证企微用户有效性
      */
@@ -995,7 +954,55 @@ public class WxTaskService {
         }
         return true;
     }
-
+    
+    /**
+     * 处理企微加微逻辑
+     */
+    private List<CompanyWxClient> processQwAddWx(List<QwUser> qwUserList, Map<Long, CompanyWxClient> clientMap) {
+        List<CompanyWxClient> upClientList = Collections.synchronizedList(new ArrayList<>());
+        
+        qwUserList.parallelStream().forEach(qwUser -> {
+            try {
+                processSingleQwUser(qwUser, clientMap, upClientList);
+            } catch (Exception e) {
+                log.error("处理企微用户{}异常", qwUser.getId(), e);
+            }
+        });
+        
+        return upClientList;
+    }
+    
+    /**
+     * 处理单个企微用户
+     */
+    private void processSingleQwUser(QwUser qwUser, Map<Long, CompanyWxClient> clientMap, 
+                                   List<CompanyWxClient> upClientList) {
+        CompanyWxClient client = clientMap.get(qwUser.getId());
+        if (client == null) {
+            log.error("当前账号暂无需要添加微信:{}-{}", qwUser.getId(), qwUser.getQwUserName());
+            return;
+        }
+        
+        CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
+        if (crmCustomer == null || StringUtils.isBlank(crmCustomer.getMobile())) {
+            log.info("查询客户{}手机号为空,跳过执行", crmCustomer == null ? "" : crmCustomer.getCustomerName());
+            return;
+        }
+        
+        String task = redisCache.getCacheObject(Constants.TASK_ID + client.getRoboticId());
+        log.info("ROBOTIC-ID:{},CLIENT-ID:{},当前任务执行状态:{}", 
+                client.getRoboticId(), client.getId(), task);
+        
+        if (!StringUtils.isNotEmpty(task) || !Constants.QW_ADD_WX.equals(task)) {
+            log.error("ROBOTIC-ID:{},当前任务没有执行加微任务", client.getRoboticId());
+            return;
+        }
+        
+        // 开始申请加微
+        WxWorkResponseDTO<String> resp = qwAddWxInvokeIpad(crmCustomer.getMobile(), qwUser.getUid(), qwUser.getServerId());
+        //处理申请加微结果
+        handleAddWxResult(resp, client, qwUser, crmCustomer, upClientList);
+    }
 
     /**
      * 企微加个微调用ipad端
@@ -1006,7 +1013,7 @@ public class WxTaskService {
      */
     private WxWorkResponseDTO<String> qwAddWxInvokeIpad(String mobile, String qwUid, Long serverId) {
         if (StringUtils.isBlank(mobile) || StringUtils.isBlank(qwUid) || serverId == null) {
-            log.warn("企微申请加好友任务参数校验失败: mobile={}, qwUid={}, serverId={}", mobile, qwUid, serverId);
+            log.warn("参数校验失败: mobile={}, qwUid={}, serverId={}", mobile, qwUid, serverId);
             return null;
         }
         try {
@@ -1019,44 +1026,67 @@ public class WxTaskService {
             wxAddSearchDTO.setTicket(null);
 
             WxWorkResponseDTO<String> response = wxWorkService.addSearch(wxAddSearchDTO, serverId);
-            log.debug("企微申请加好友任务调用结果: errcode={}, errmsg={}",
+            log.debug("企微加微接口调用结果: errcode={}, errmsg={}",
                     response != null ? response.getErrcode() : "null",
                     response != null ? response.getErrmsg() : "null");
 
             return response;
-
-            // 测试代码
-//            WxWorkResponseDTO<String> response = new WxWorkResponseDTO<>();
-//            response.setErrcode(0);
-//            return response;
         } catch (Exception e) {
-            log.error("企微申请加好友任务请求接口异常: mobile={}, qwUid={}, serverId={}", mobile, qwUid, serverId, e);
+            log.error("企微加个微请求接口异常: mobile={}, qwUid={}, serverId={}", mobile, qwUid, serverId, e);
             return null;
         }
     }
+
+    /**
+     * 处理加微结果
+     */
+    private void handleAddWxResult(WxWorkResponseDTO<String> resp, CompanyWxClient client, 
+                                 QwUser qwUser, CrmCustomer crmCustomer, 
+                                 List<CompanyWxClient> upClientList) {
+        JSONObject runParam = new JSONObject();
+        runParam.put("qwId", qwUser.getId());
+        runParam.put("mobile", crmCustomer.getMobile());
+        runParam.put("qwUid", qwUser.getUid());
+        runParam.put("clientId", client.getId());
+        
+        CompanyVoiceRoboticCallLogAddwx addLog = CompanyVoiceRoboticCallLogAddwx.initCallLog(
+                runParam.toJSONString(), client.getId(), client.getRoboticId(), qwUser.getId(), qwUser.getCompanyId());
+        
+        if (resp != null && resp.getErrcode() == 0) {
+            // 加微消息已发送成功
+            client.setIsAdd(2);
+            client.setAddTime(LocalDateTime.now());
+            upClientList.add(client);
+            addLog.setStatus(1);
+            addLog.setResult(JSON.toJSONString(resp));
+            addLog.setIsWeCom(2);
+            log.info("ROBOTIC-ID:{},加微成功", client.getRoboticId());
+        } else {
+            // 加微消息发送失败,补偿重试
+            handleFailedAddWxWithRetry(client, upClientList);
+        }
+        
+        asyncSaveCompanyVoiceRoboticCallLog(addLog);
+    }
     
     /**
      * 处理单个客户加微结果
      */
     private void processSingleClientResult(CompanyWxClient client, List<CompanyWxClient> upClientList) {
         if (StringUtils.isBlank(client.getPhone())) {
-            handleFailedAddWx(client, upClientList, "无电话号码",0);
+            handleFailedAddWx(client, upClientList, "无电话号码");
             return;
         }
         
         // 查询外部联系人表是否有数据
         QwExternalContact qwExternalContact = qwExternalContactMapper.queryQwUserIdIsAddContact(
                 client.getAccountId(), client.getPhone(), 2);
-
+        
         if (qwExternalContact != null && qwExternalContact.getId() > 0) {
             handleSuccessfulAddWx(client, upClientList);
         } else {
-            handleFailedAddWxWithRetry(client, upClientList,0);
+            handleFailedAddWxWithRetry(client, upClientList);
         }
-
-        //测试代码
-//        handleSuccessfulAddWx(client, upClientList);
-//        handleFailedAddWx(client, upClientList, "无电话号码",0);
     }
     
     /**
@@ -1076,19 +1106,19 @@ public class WxTaskService {
         client.setAddTime(LocalDateTime.now());
         upClientList.add(client);
         redisCache.deleteObject("qwAddWx_" + client.getId());
-        log.info("ROBOTIC-ID:{},企微申请加微结果查询任务加微成功:{}", client.getRoboticId(), client.getId());
+        log.info("ROBOTIC-ID:{},加微成功:{}", client.getRoboticId(), client.getId());
     }
-
+    
     /**
      * 处理加微失败的情况
      */
-    private void handleFailedAddWx(CompanyWxClient client, List<CompanyWxClient> upClientList, String reason,Integer isAdd) {
-        String taskName = isAdd == 1 ? "企微申请加好友任务" : "加微结果查询任务";
-        log.error("ROBOTIC-ID:{},{}:{},clientId={}", client.getRoboticId(),taskName, reason, client.getId());
+    private void handleFailedAddWx(CompanyWxClient client, List<CompanyWxClient> upClientList, String reason) {
+        log.error("ROBOTIC-ID:{},{}加微失败:{}", client.getRoboticId(), reason, client.getId());
         client.setIsAdd(3);
         client.setUpdateTime(new Date());
         upClientList.add(client);
-        // 更新日志记录
+        
+        // 更新记录
         companyVoiceRoboticCallLogAddwxService.lambdaUpdate()
                 .eq(CompanyVoiceRoboticCallLogAddwx::getRoboticId, client.getRoboticId())
                 .eq(CompanyVoiceRoboticCallLogAddwx::getWxClientId, client.getId())
@@ -1102,19 +1132,17 @@ public class WxTaskService {
     /**
      * 处理加微失败并重试计数
      */
-    private void handleFailedAddWxWithRetry(CompanyWxClient client, List<CompanyWxClient> upClientList,Integer isAdd) {
-        String taskName = isAdd == 1 ? "企微申请加好友任务" : "加微结果查询任务";
-        log.error("ROBOTIC-ID:{},{}失败:{}", client.getRoboticId(),taskName, client.getId());
+    private void handleFailedAddWxWithRetry(CompanyWxClient client, List<CompanyWxClient> upClientList) {
+        log.error("ROBOTIC-ID:{},加微失败:{}", client.getRoboticId(), client.getId());
         String failCountStr = redisCache.getCacheObject("qwAddWx_" + client.getId());
         int failCount = 1;
         
         if (StringUtils.isNotBlank(failCountStr)) {
-            if (Integer.parseInt(failCountStr) >= 60 * 24) { // 超过一天
-                handleFailedAddWx(client, upClientList, "超过最大重试次数",isAdd);
-                redisCache.deleteObject("qwAddWx_" + client.getId());
+            failCount += Integer.parseInt(failCountStr);
+            if (failCount >= 60 * 24) { // 超过一天
+                handleFailedAddWx(client, upClientList, "超过最大重试次数");
             } else {
-                failCount += Integer.parseInt(failCountStr);
-                redisCache.setCacheObject("qwAddWx_" + client.getId(), String.valueOf(failCount-1));
+                redisCache.setCacheObject("qwAddWx_" + client.getId(), String.valueOf(failCount));
             }
         } else {
             redisCache.setCacheObject("qwAddWx_" + client.getId(), String.valueOf(failCount), 25, TimeUnit.HOURS);
@@ -1126,74 +1154,80 @@ public class WxTaskService {
      */
     private void batchUpdateClients(List<CompanyWxClient> upClientList) {
         companyWxClientService.updateBatchById(upClientList);
-        // 从 upClientList 中筛选出 isAdd=1和3加微失败的数据
+
+        // 从 upClientList 中筛选出 isAdd=1即加微成功的数据
         List<CompanyWxClient> successClients = upClientList.stream()
-                .filter(client -> client.getIsAdd() != null && (client.getIsAdd() == 1 || client.getIsAdd() == 3))
+                .filter(client -> client.getIsAdd() != null && client.getIsAdd() == 1)
                 .collect(Collectors.toList());
-        if(!successClients.isEmpty()){
-            successClients.forEach(client -> {
-                triggerWorkflowOnAddWxSuccess(client.getId());
-            });
-        }
-    }
-
 
+        // 根据加微成功的用户,判定是否加入延时执行下一步任务
+        Set<Long> roboticIdSet = successClients.stream()
+                .map(CompanyWxClient::getRoboticId)
+                .collect(Collectors.toSet());
+        Set<Long> userIdSet = successClients.stream()
+                .map(CompanyWxClient::getCustomerId)
+                .collect(Collectors.toSet());
+        
+        // 获取任务和callees数据
+        List<CompanyVoiceRobotic> robotList = companyVoiceRoboticMapper.selectBatchIds(roboticIdSet);
+        Map<Long, CompanyVoiceRobotic> roboticsMap = robotList.stream()
+                .collect(Collectors.toMap(CompanyVoiceRobotic::getId, Function.identity(), (a, b) -> a));
+        
+        List<CompanyVoiceRoboticCallees> calleesList = companyVoiceRoboticCalleesMapper
+                .selectCalleesListByRoboticIdsAndUserIds(userIdSet, roboticIdSet);
+        Map<String, CompanyVoiceRoboticCallees> calleesMap = calleesList.stream()
+                .collect(Collectors.toMap(e -> e.getUserId() + "-" + e.getRoboticId(), Function.identity(), (a, b) -> a));
+        
+        // 设置延时任务
+        setupDelayTasks(successClients, roboticsMap, calleesMap);
+        
+        // 更新任务流程状态
+        updateTaskFlows(calleesList, roboticIdSet);
+    }
+    
     /**
-     * 加微结果触发工作流继续执行
-     * @param wxClientId 加微客户ID
+     * 设置延时任务
      */
-    private void triggerWorkflowOnAddWxSuccess(Long wxClientId) {
-        try {
-            // 查找等待中的加微工作流实例
-            CompanyAiWorkflowExec waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
-                    wxClientId,
-                    ExecutionStatusEnum.WAITING.getValue(),
-                    NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue());
-            if (waitingExec == null) {
-                log.info("未找到等待中的加微工作流实例 - wxClientId: {}", wxClientId);
+    private void setupDelayTasks(List<CompanyWxClient> upClientList, 
+                               Map<Long, CompanyVoiceRobotic> roboticsMap,
+                               Map<String, CompanyVoiceRoboticCallees> calleesMap) {
+        upClientList.forEach(client -> {
+            CompanyVoiceRobotic robotic = roboticsMap.get(client.getRoboticId());
+            if (robotic == null) {
+                log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
                 return;
             }
-            //查询工作流加微执行日志是否未更新状态
-            CompanyAiWorkflowExecLog queryP = new CompanyAiWorkflowExecLog();
-            queryP.setWorkflowInstanceId(waitingExec.getWorkflowInstanceId());
-            queryP.setNodeType(NodeTypeEnum.AI_QW_ADD_WX_TASK.getValue());
-            queryP.setStatus(ExecutionStatusEnum.WAITING.getValue());
-            List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(queryP);
-            companyAiWorkflowExecLogs.forEach(log -> {
-                        log.setStatus(ExecutionStatusEnum.SUCCESS.getValue());
-                        companyAiWorkflowExecLogMapper.updateById(log);
-                    }
-            );
-
-            String workflowInstanceId = waitingExec.getWorkflowInstanceId();
-            String currentNodeKey = waitingExec.getCurrentNodeKey();
-
-            log.info("加微成功回调,尝试触发工作流继续执行 - workflowInstanceId: {}, nodeKey: {}, wxClientId: {}",
-                    workflowInstanceId, currentNodeKey, wxClientId);
-
-            // 互斥检查:如果已经被执行过(超时路径或其他回调),则不再执行
-            if (!AiQwAddWxTaskNode.tryMarkAsExecuted(workflowInstanceId, wxClientId)) {
-                log.info("企微申请加微结果查询任务工作流已被其他路径执行,跳过 - workflowInstanceId: {}, wxClientId: {}",
-                        workflowInstanceId, wxClientId);
+            
+            CompanyVoiceRoboticCallees callee = calleesMap.get(client.getCustomerId() + "-" + client.getRoboticId());
+            if (callee == null) {
+                log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
                 return;
             }
-
-            // 清除超时检测Key(回调成功了,不需要超时检测了)
-            AiQwAddWxTaskNode.clearTimeoutKey(workflowInstanceId, wxClientId);
-
-            // 触发工作流继续执行
-            Map<String, Object> inputData = new HashMap<>();
-            inputData.put("addWxSuccess", true);
-            inputData.put("wxClientId", wxClientId);
-            inputData.put("triggerType", "callback"); // 回调触发
-
-            companyWorkflowEngine.resumeFromBlockingNode(workflowInstanceId, currentNodeKey, inputData);
-
-            log.info("企微申请加微结果查询任务加微成功回调触发工作流继续执行完成 - workflowInstanceId: {}, wxClientId: {}",
-                    workflowInstanceId, wxClientId);
-
-        } catch (Exception ex) {
-            log.error("企微申请加微结果查询任务加微成功回调触发工作流异常 - wxClientId: {}", wxClientId, ex);
-        }
+            
+            Integer addWxTime = robotic.getAddWxTime();
+            if (addWxTime != null) {
+                long endTime = System.currentTimeMillis() + addWxTime * 60 * 1000;
+                String key = Constants.CID_NEXT_TASK_ID + callee.getRoboticId() + ":" + callee.getId();
+                redisCache.setCacheObject(key, String.valueOf(endTime));
+            } else {
+                log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有设置加微后置等待时间", client.getRoboticId(), client.getId());
+            }
+        });
+    }
+    
+    /**
+     * 更新任务流程状态
+     */
+    private void updateTaskFlows(List<CompanyVoiceRoboticCallees> calleesList, Set<Long> roboticIdSet) {
+        calleesList.forEach(callee -> 
+            callee.setRunTaskFlow(
+                StringUtils.isBlank(callee.getRunTaskFlow()) ?
+                    Constants.QW_ADD_WX : callee.getRunTaskFlow() + "," + Constants.QW_ADD_WX
+            )
+        );
+        
+        companyVoiceRoboticCalleesServiceImpl.updateBatchById(calleesList);
+        companyVoiceRoboticServiceImpl.finishAddWxByCallees(roboticIdSet);
     }
+
 }

+ 8 - 8
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -20,10 +20,10 @@ public class WxTask {
     @Autowired
     private WxTaskService taskService;
 
-    @Scheduled(cron = "0 0/30 * * * ?")
-    public void addWx() {
-        taskService.addWx(null);
-    }
+//    @Scheduled(cron = "0 0/30 * * * ?")
+//    public void addWx() {
+//        taskService.addWx(null);
+//    }
     @Scheduled(cron = "0 0/1 * * * ?")
     public void addWx4Workflow() {
         taskService.addWx4Workflow(null);
@@ -51,10 +51,10 @@ public class WxTask {
      * 工作流加微超时检测
      * 每分钟执行一次,检查是否有加微超时的工作流需要继续执行
      */
-    @Scheduled(cron = "0 0/1 * * * ?")
-    public void checkWorkflowAddWxTimeout(){
-        taskService.checkWorkflowAddWxTimeout();
-    }
+//    @Scheduled(cron = "0 0/1 * * * ?")
+//    public void checkWorkflowAddWxTimeout(){
+//        taskService.checkWorkflowAddWxTimeout();
+//    }
 
     @Scheduled(cron = "0 0/1 * * * ?")
     public void cidWorkflowAddWxRun(){

+ 1 - 0
fs-wx-task/src/main/resources/application.yml

@@ -14,3 +14,4 @@ spring:
 #    active: druid-sxjz
 #    active: druid-hdt
 #    active: druid-myhk-test
+cid-group-no: 1