Parcourir la source

Merge remote-tracking branch 'origin/master'

yjwang il y a 3 mois
Parent
commit
61a5f2f08d

+ 34 - 0
fs-ipad-task/src/main/java/com/fs/app/service/CustomThreadPoolConfig.java

@@ -0,0 +1,34 @@
+package com.fs.app.service;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @author MixLiu
+ * @date 2025/7/11 上午11:04)
+ */
+@Configuration
+public class CustomThreadPoolConfig {
+    @Bean(name = "customThreadPool", destroyMethod = "shutdown")
+    public ThreadPoolTaskExecutor customThreadPool() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        // 核心线程数
+        executor.setCorePoolSize(300);
+        // 最大线程数
+        executor.setMaxPoolSize(300);
+        // 线程名前缀
+        executor.setThreadNamePrefix("custom-pool-");
+        // 拒绝策略:直接丢弃新任务
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        // 非核心线程空闲存活时间(秒)
+        executor.setKeepAliveSeconds(60);
+        // 等待所有任务完成后关闭线程池
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        // 初始化
+        executor.initialize();
+        return executor;
+    }
+}

+ 7 - 3
fs-ipad-task/src/main/java/com/fs/app/service/IpadSendServer.java

@@ -141,10 +141,10 @@ public class IpadSendServer {
         }
     }
 
-    public boolean isSend(QwUser qwUser) {
+    public boolean isSend(QwUser qwUser, BaseVo parentVo) {
         // 判断企微发送方式是否是ipad
         if (qwUser.getSendMsgType() == 0) {
-            log.debug("企微用户:ID:{} 名称:{} 发送方式:{}", qwUser.getId(), qwUser.getQwUserName(), qwUser.getSendMsgType().toString());
+            log.debug("发送方式是侧边栏企微用户:ID:{} 名称:{}", qwUser.getId(), qwUser.getQwUserName());
             return false;
         }
         // 判断是否信息准确
@@ -188,6 +188,7 @@ public class IpadSendServer {
                 qwUserMapper.updateById(updateQwUser);
                 return false;
             }
+            parentVo.setCorpId(login.getUser_info().getObject().getCorp_id());
             log.debug("QwUserID:{}, AI主机信息:{}", qwUser.getId(), login);
         } catch (Exception e) {
             updateQwUser.setId(qwUser.getId());
@@ -196,6 +197,7 @@ public class IpadSendServer {
             qwUserMapper.updateById(updateQwUser);
             return false;
         }
+
         return true;
     }
 
@@ -277,13 +279,15 @@ public class IpadSendServer {
         return true;
     }
 
-    public void send(QwSopCourseFinishTempSetting.Setting content, QwUser qwUser, QwSopLogs qwSopLogs, Map<String, CourseMaConfig> miniMap) {
+    public void send(QwSopCourseFinishTempSetting.Setting content, QwUser qwUser, QwSopLogs qwSopLogs, Map<String, CourseMaConfig> miniMap, BaseVo parentVo) {
         BaseVo vo = new BaseVo();
         vo.setId(Long.parseLong(qwSopLogs.getId()));
         vo.setRoom(qwSopLogs.getSendType() == 12);
         vo.setUuid(qwUser.getUid());
         vo.setExId(qwSopLogs.getExternalUserId());
         vo.setServerId(qwUser.getServerId());
+        vo.setCorpCode(parentVo.getCorpCode());
+        vo.setCorpId(parentVo.getCorpId());
         try {
             content.setSendStatus(1);
             switch (content.getContentType()) {

+ 36 - 47
fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java

@@ -15,7 +15,6 @@ import com.fs.qw.mapper.QwIpadServerMapper;
 import com.fs.qw.mapper.QwUserMapper;
 import com.fs.qw.service.impl.AsyncSopTestService;
 import com.fs.qw.vo.QwSopCourseFinishTempSetting;
-import com.fs.qw.vo.QwSopTempSetting;
 import com.fs.sop.domain.QwSopLogs;
 import com.fs.sop.mapper.QwSopLogsMapper;
 import com.fs.sop.service.IQwSopLogsService;
@@ -37,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 @Component
 @Slf4j
@@ -56,7 +54,6 @@ public class SendMsg {
     private String groupNo;
     private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
     private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
-    private final Map<Long, Long> removeQwMap = new ConcurrentHashMap<>();
 
     @Autowired
     @Qualifier("customThreadPool")
@@ -75,7 +72,7 @@ public class SendMsg {
     private List<QwUser> getQwUserList() {
         if (qwUserList.isEmpty()) {
             List<QwIpadServer> serverList = qwIpadServerMapper.selectList(new QueryWrapper<QwIpadServer>().eq("group_no", groupNo));
-            if(serverList.isEmpty()){
+            if (serverList.isEmpty()) {
                 return new ArrayList<>();
             }
             List<Long> serverIds = PubFun.listToNewList(serverList, QwIpadServer::getId);
@@ -117,45 +114,36 @@ public class SendMsg {
             delayEnd = config.getDelayEnd();
         }
         Map<String, CourseMaConfig> miniMap = getMiniMap();
-        // 清空需要删除的
-        List<Long> keyList = new ArrayList<>(removeQwMap.keySet());
-        keyList.forEach(key -> {
-            removeQwMap.remove(key);
-            qwMap.remove(key);
-        });
-        log.info("删除id:{}", JSON.toJSONString(keyList));
         getQwUserList().forEach(e -> {
-            synchronized (e.getId()){
-                if (qwMap.containsKey(e.getId())) {
-                    log.error("用户:{}已在处理中,跳过重复执行", e.getQwUserName());
-                    return;
-                }
-                qwMap.computeIfAbsent(e.getId(), k -> {
-                    CompletableFuture.runAsync(() -> {
-                        try {
-                            processUser(e, delayStart, delayEnd, miniMap);
-                        } catch (Exception exception){
-                            log.error("发送错误:", exception);
-                        }finally {
-                            removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
-                        }
-                    }, customThreadPool);
-                    return System.currentTimeMillis(); // 占位值
-                });
-            }
+            qwMap.computeIfAbsent(e.getId(), k -> {
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        log.info("开始任务:{}", e.getQwUserName());
+                        processUser(e, delayStart, delayEnd, miniMap);
+                    } catch (Exception exception) {
+                        log.error("发送错误:", exception);
+                    } finally {
+                        log.info("删除任务:{}", e.getQwUserName());
+                        qwMap.remove(e.getId());
+//                        removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
+                    }
+                }, customThreadPool);
+                return System.currentTimeMillis(); // 占位值
+            });
         });
     }
 
     private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, CourseMaConfig> miniMap) {
         long start1 = System.currentTimeMillis();
         List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
-        if(qwSopLogList.isEmpty()){
+        if (qwSopLogList.isEmpty()) {
             return;
         }
         QwUser user = qwUserMapper.selectById(qwUser.getId());
         BaseVo parentVo = new BaseVo();
+        parentVo.setCorpCode(qwUser.getCorpId());
         long end1 = System.currentTimeMillis();
-        if (!sendServer.isSend(user)) {
+        if (!sendServer.isSend(user, parentVo)) {
             return;
         }
         log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
@@ -171,24 +159,27 @@ public class SendMsg {
             log.info("进入发送消息状态:{}", qwSopLogs.getId());
             String key = "qw:logs:pad:send:id:" + qwSopLogs.getId();
             Long time = redisCache.getCacheObject(key);
-            if(redisCache.getCacheObject(key) != null){
+            if (redisCache.getCacheObject(key) != null) {
                 log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId(), time);
                 return;
             }
             redisCache.setCacheObject(key, System.currentTimeMillis(), 10, TimeUnit.MINUTES);
             for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
-                sendServer.send(content, user, qwSopLogs, miniMap);
-//                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
-//                    QwUser update = new QwUser();
-//                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
-//                    update.setUpdateTime(new Date());
-//                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
-//                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
-//                    return;
-//                }
+                long start4 = System.currentTimeMillis();
+                sendServer.send(content, user, qwSopLogs, miniMap, parentVo);
+                long end4 = System.currentTimeMillis();
+                log.info("请求pad发送完成:{}, {}, 时长4:{}", user.getQwUserName(), qwSopLogs.getId(), end4 - start4);
+                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
+                    QwUser update = new QwUser();
+                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
+                    update.setUpdateTime(new Date());
+                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
+                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
+                    return;
+                }
                 try {
                     int delay = ThreadLocalRandom.current().nextInt(300, 1000);
-                    log.debug("等待:{}ms", delay);
+                    log.debug("pad发送消息等待:{}ms", delay);
                     Thread.sleep(delay);
                 } catch (InterruptedException e) {
                     log.error("线程等待错误!");
@@ -214,15 +205,13 @@ public class SendMsg {
                 updateQwSop.setRemark("全部发送成功");
                 updateQwSop.setRealSendTime(sdf.format(new Date()));
             }
-//            updateQwSop.setReceivingStatus(1L);
-//            updateQwSop.setSendStatus(1L);
-//            updateQwSop.setRealSendTime(sdf.format(new Date()));
+            updateQwSop.setContentJson(JSON.toJSONString(setting));
             long end2 = System.currentTimeMillis();
             int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
-            log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(),end2 - start2);
+            log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
             try {
                 int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
-                log.debug("等待:{}ms", delay);
+                log.debug("企微发送消息等待:{}ms", delay);
                 Thread.sleep(delay);
             } catch (InterruptedException e) {
                 log.error("线程等待错误!");

+ 4 - 0
fs-service/src/main/java/com/fs/ipad/vo/BaseVo.java

@@ -9,12 +9,16 @@ public class BaseVo{
     private String uuid;
     private Long serverId;
     private String exId;
+    private String corpId;
+    private String corpCode;
     private boolean isRoom;
 
 
     public void setBase(BaseVo vo){
         this.uuid = vo.getUuid();
         this.serverId = vo.getServerId();
+        this.corpId = vo.getCorpId();
+        this.corpCode = vo.getCorpCode();
         this.exId = vo.getExId();
         this.isRoom = vo.isRoom();
     }

+ 6 - 1
fs-service/src/main/java/com/fs/wxwork/utils/WxWorkHttpUtilNew.java

@@ -9,9 +9,11 @@ import com.alibaba.fastjson.TypeReference;
 import com.fs.common.exception.base.BaseException;
 import com.fs.ipad.enums.ErrCodeEnum;
 import com.fs.wxwork.dto.WxWorkResponseCodeDTO;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Map;
 
@@ -19,6 +21,7 @@ import java.util.Map;
  * HTTP请求工具类(基于Hutool封装)
  * 增强功能:请求参数、Header和响应日志记录
  */
+@Slf4j
 public class WxWorkHttpUtilNew {
     private static final Logger logger = LoggerFactory.getLogger(WxWorkHttpUtilNew.class);
 
@@ -100,9 +103,11 @@ public class WxWorkHttpUtilNew {
      * @return 响应字符串
      */
     public static <T> T postWithType(String url, Object jsonBody, TypeReference<T> type) {
-        String post = post(url, JSON.toJSONString(jsonBody), null);
+        String json = JSON.toJSONString(jsonBody);
+        String post = post(url, json, null);
         WxWorkResponseCodeDTO respone = JSON.parseObject(post, WxWorkResponseCodeDTO.class);
         if(respone.getErrcode() != 0){
+            log.error("请求服务器地址:{},请求参数:{},返回数据:{},请求时间:{}", url, json, JSON.toJSONString(respone), LocalDateTime.now());
             String errMsg = ErrCodeEnum.getErrMsg(respone.getErrcode());
             if("未知的服务端错误".equals(errMsg)){
                 errMsg = respone.getErrmsg();