浏览代码

修复可能出现不发送情况

吴树波 3 周之前
父节点
当前提交
5a5e1a8b69
共有 2 个文件被更改,包括 88 次插入31 次删除
  1. 87 30
      fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java
  2. 1 1
      fs-ipad-task/src/main/resources/application.yml

+ 87 - 30
fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java

@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.fs.app.service.IpadSendServer;
 import com.fs.common.core.redis.RedisCacheT;
+import com.fs.common.utils.CloudHostUtils;
 import com.fs.common.utils.DateUtils;
 import com.fs.common.utils.PubFun;
 import com.fs.company.service.ICompanyMiniappService;
@@ -41,10 +42,8 @@ import org.springframework.util.StringUtils;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 @Component
@@ -59,7 +58,6 @@ public class SendMsg {
     private final QwIpadServerMapper qwIpadServerMapper;
     private final RedisCacheT<Long> redisCache;
     private final AsyncSopTestService asyncSopTestService;
-    private final ICompanyMiniappService companyMiniappService;
     private final IFsCoursePlaySourceConfigService fsCoursePlaySourceConfigService;
     private final QwPushCountMapper qwPushCountMapper;
     private final QwRestrictionPushRecordMapper qwRestrictionPushRecordMapper;
@@ -67,13 +65,36 @@ public class SendMsg {
     @Value("${group-no}")
     private String groupNo;
     private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
-    private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
+    private final Map<Long, TaskContext> qwMap = new ConcurrentHashMap<>();
+    private static final long TASK_TIMEOUT_MS = 3 * 60 * 1000L;
+
+    private static class TaskContext {
+        final long startTime;
+        final AtomicBoolean cancelled;
+
+        TaskContext() {
+            this.startTime = System.currentTimeMillis();
+            this.cancelled = new AtomicBoolean(false);
+        }
+
+        boolean isTimeout() {
+            return System.currentTimeMillis() - startTime > TASK_TIMEOUT_MS;
+        }
+
+        void cancel() {
+            cancelled.set(true);
+        }
+
+        boolean isCancelled() {
+            return cancelled.get();
+        }
+    }
 
     @Autowired
     @Qualifier("customThreadPool")
     private ThreadPoolTaskExecutor customThreadPool;
 
-    public SendMsg(QwUserMapper qwUserMapper, QwSopLogsMapper qwSopLogsMapper, IpadSendServer sendServer, SysConfigMapper sysConfigMapper, IQwSopLogsService qwSopLogsService, QwIpadServerMapper qwIpadServerMapper, RedisCacheT<Long> redisCache, AsyncSopTestService asyncSopTestService, ICompanyMiniappService companyMiniappService, IFsCoursePlaySourceConfigService fsCoursePlaySourceConfigService, QwPushCountMapper qwPushCountMapper, QwRestrictionPushRecordMapper qwRestrictionPushRecordMapper) {
+    public SendMsg(QwUserMapper qwUserMapper, QwSopLogsMapper qwSopLogsMapper, IpadSendServer sendServer, SysConfigMapper sysConfigMapper, IQwSopLogsService qwSopLogsService, QwIpadServerMapper qwIpadServerMapper, RedisCacheT<Long> redisCache, AsyncSopTestService asyncSopTestService, IFsCoursePlaySourceConfigService fsCoursePlaySourceConfigService, QwPushCountMapper qwPushCountMapper, QwRestrictionPushRecordMapper qwRestrictionPushRecordMapper) {
         this.qwUserMapper = qwUserMapper;
         this.qwSopLogsMapper = qwSopLogsMapper;
         this.sendServer = sendServer;
@@ -82,7 +103,6 @@ public class SendMsg {
         this.qwIpadServerMapper = qwIpadServerMapper;
         this.redisCache = redisCache;
         this.asyncSopTestService = asyncSopTestService;
-        this.companyMiniappService = companyMiniappService;
         this.fsCoursePlaySourceConfigService = fsCoursePlaySourceConfigService;
         this.qwPushCountMapper = qwPushCountMapper;
         this.qwRestrictionPushRecordMapper = qwRestrictionPushRecordMapper;
@@ -104,8 +124,6 @@ public class SendMsg {
 
     private Map<String, FsCoursePlaySourceConfig> getMiniMap() {
         List<FsCoursePlaySourceConfig> list = fsCoursePlaySourceConfigService.list(new QueryWrapper<FsCoursePlaySourceConfig>().ne("type", 2).eq("is_del", 0));
-//        log.info("获取到的小程序配置:{}", JSON.toJSONString(list));
-//        log.info("获取到的小程序配置:{}", JSON.toJSONString(list));
         return PubFun.listToMapByGroupObject(list, FsCoursePlaySourceConfig::getAppid);
     }
 
@@ -139,23 +157,36 @@ public class SendMsg {
         Map<String, FsCoursePlaySourceConfig> miniMap = getMiniMap();
         // 获取 pad 发送的企微
         getQwUserList().forEach(e -> {
-            // 如果没有值就执行后面的方法 并且入值
-            qwMap.computeIfAbsent(e.getId(), k -> {
-                // 线程启动
-                CompletableFuture.runAsync(() -> {
-                    try {
-                        log.info("开始任务:{}", e.getQwUserName());
-                        // 开始任务
-                        processUser(e, delayStart, delayEnd, miniMap);
-                    } catch (Exception exception) {
-                        log.error("发送错误:", exception);
-                    } finally {
-                        log.info("删除任务:{}", e.getQwUserName());
-                        qwMap.remove(e.getId());
-//                        removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
-                    }
-                }, customThreadPool);
-                return System.currentTimeMillis(); // 占位值
+            TaskContext ctx = qwMap.get(e.getId());
+            if (ctx != null) {
+                if (ctx.isTimeout()) {
+                    log.warn("任务超时,标记取消:{}, 已运行: {}ms", e.getQwUserName(), System.currentTimeMillis() - ctx.startTime);
+                    ctx.cancel();
+                } else {
+                    log.debug("任务正在执行中,跳过:{}", e.getQwUserName());
+                    return;
+                }
+            }
+            if (customThreadPool.getActiveCount() >= customThreadPool.getMaxPoolSize()) {
+                log.warn("线程池已满,跳过任务:{}, 活跃线程: {}/{}", e.getQwUserName(), customThreadPool.getActiveCount(), customThreadPool.getMaxPoolSize());
+                return;
+            }
+            TaskContext newCtx = new TaskContext();
+            qwMap.put(e.getId(), newCtx);
+            CompletableFuture.runAsync(() -> {
+                try {
+                    log.info("开始任务:{}", e.getQwUserName());
+                    processUser(e, delayStart, delayEnd, miniMap, newCtx);
+                } catch (Exception exception) {
+                    log.error("发送错误:", exception);
+                } finally {
+                    log.info("删除任务:{}", e.getQwUserName());
+                    qwMap.remove(e.getId());
+                }
+            }, customThreadPool).exceptionally(ex -> {
+                log.error("任务提交失败:{}, 错误: {}", e.getQwUserName(), ex.getMessage());
+                qwMap.remove(e.getId());
+                return null;
             });
         });
     }
@@ -166,8 +197,9 @@ public class SendMsg {
      * @param delayStart 随机延迟 最小值
      * @param delayEnd   随机延迟 最大值
      * @param miniMap    小程序配置
+     * @param ctx        任务上下文(用于取消检查)
      */
-    private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, FsCoursePlaySourceConfig> miniMap) {
+    private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, FsCoursePlaySourceConfig> miniMap, TaskContext ctx) {
         long start1 = System.currentTimeMillis();
         // 获取当前企微待发送记录
         List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
@@ -180,15 +212,25 @@ public class SendMsg {
         BaseVo parentVo = new BaseVo();
         parentVo.setCorpCode(qwUser.getCorpId());
         long end1 = System.currentTimeMillis();
+        // 检查是否被取消
+        if (ctx.isCancelled()) {
+            log.info("任务被取消,退出:{}", qwUser.getQwUserName());
+            return;
+        }
         // 判断这个企微是否需要发送
         if (!sendServer.isSend(user, parentVo)) {
             log.info("当前这个企微不需要发送 数据{}",user);
             return;
         }
-        log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
+        log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, ctx.startTime);
         long start3 = System.currentTimeMillis();
         // 循环代发送消息
         for (QwSopLogs qwSopLogs : qwSopLogList) {
+            // 检查是否被取消
+            if (ctx.isCancelled()) {
+                log.info("任务被取消,中断发送:{}, 已发送部分消息", qwUser.getQwUserName());
+                return;
+            }
             long start2 = System.currentTimeMillis();
             QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
             //直播的sendType:20单独走判断 其他的走以前的逻辑
@@ -200,7 +242,7 @@ public class SendMsg {
                 }
             }
             else{
-                // 判断消息状态是否满足发送条件
+                // 判断消息状态是否满足发isSendLogs送条件
                 if (!sendServer.isSendLogs(qwSopLogs, setting, user)) {
                     log.info("销售:{}, 消息发送条件未满足:{}", user.getQwUserName(), qwSopLogs.getId());
                     continue;
@@ -219,6 +261,11 @@ public class SendMsg {
             Map<Integer, List<QwPushCount>> pushMap = pushCountList.stream().collect(Collectors.groupingBy(QwPushCount::getType));
             // 循环发送消息里面的每一条消息
             for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
+                // 检查是否被取消
+                if (ctx.isCancelled()) {
+                    log.info("任务被取消,中断发送:{}", qwUser.getQwUserName());
+                    return;
+                }
                 long start4 = System.currentTimeMillis();
                 //判断当前销售推送客户消息限制
                 Long qwUserId = qwUser.getId();//销售的Id
@@ -270,11 +317,16 @@ public class SendMsg {
                         return;
                     }
                     try {
+                        if (ctx.isCancelled()) {
+                            return;
+                        }
                         int delay = ThreadLocalRandom.current().nextInt(300, 1000);
                         log.debug("pad发送消息等待:{}ms", delay);
                         Thread.sleep(delay);
                     } catch (InterruptedException e) {
                         log.error("线程等待错误!");
+                        Thread.currentThread().interrupt();
+                        return;
                     }
                 }
             }
@@ -330,11 +382,16 @@ public class SendMsg {
             int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
             log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
             try {
+                if (ctx.isCancelled()) {
+                    return;
+                }
                 int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
                 log.debug("企微发送消息等待:{}ms", delay);
                 Thread.sleep(delay);
             } catch (InterruptedException e) {
                 log.error("线程等待错误!");
+                Thread.currentThread().interrupt();
+                return;
             }
         }
         long end3 = System.currentTimeMillis();

+ 1 - 1
fs-ipad-task/src/main/resources/application.yml

@@ -9,4 +9,4 @@ spring:
 #    active: druid-yzt
 #    active: druid-sxjz
 #    active: druid-sft
-group-no: 2
+group-no: 1