吴树波 5 dní pred
rodič
commit
267bc68c34

+ 18 - 12
fs-ipad-task/src/main/java/com/fs/app/service/CustomThreadPoolConfig.java

@@ -1,34 +1,40 @@
 package com.fs.app.service;
 
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 
-/**
- * @author MixLiu
- * @date 2025/7/11 上午11:04)
- */
+@Slf4j
 @Configuration
 public class CustomThreadPoolConfig {
+
     @Bean(name = "customThreadPool", destroyMethod = "shutdown")
     public ThreadPoolTaskExecutor customThreadPool() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        // 核心线程数
         executor.setCorePoolSize(300);
-        // 最大线程数
         executor.setMaxPoolSize(300);
-        // 线程名前缀
         executor.setThreadNamePrefix("custom-pool-");
-        // 拒绝策略:直接丢弃新任务
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
-        // 非核心线程空闲存活时间(秒)
+        executor.setQueueCapacity(1000);
+        executor.setRejectedExecutionHandler(new LoggingCallerRunsPolicy());
         executor.setKeepAliveSeconds(60);
-        // 等待所有任务完成后关闭线程池
         executor.setWaitForTasksToCompleteOnShutdown(true);
-        // 初始化
+        executor.setAwaitTerminationSeconds(300);
         executor.initialize();
         return executor;
     }
+
+    public static class LoggingCallerRunsPolicy implements RejectedExecutionHandler {
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            log.warn("线程池任务被拒绝,将由调用线程执行。活跃线程: {}, 队列大小: {}, 最大线程: {}",
+                    executor.getActiveCount(), executor.getQueue().size(), executor.getMaximumPoolSize());
+            if (!executor.isShutdown()) {
+                r.run();
+            }
+        }
+    }
 }

+ 84 - 24
fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java

@@ -42,10 +42,8 @@ import org.springframework.util.StringUtils;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 @Component
@@ -68,7 +66,30 @@ public class SendMsg {
     @Value("${group-no}")
     private String groupNo;
     private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
-    private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
+    private final Map<Long, TaskContext> qwMap = new ConcurrentHashMap<>();
+    private static final long TASK_TIMEOUT_MS = 3 * 60 * 1000L;
+
+    private static class TaskContext {
+        final long startTime;
+        final AtomicBoolean cancelled;
+
+        TaskContext() {
+            this.startTime = System.currentTimeMillis();
+            this.cancelled = new AtomicBoolean(false);
+        }
+
+        boolean isTimeout() {
+            return System.currentTimeMillis() - startTime > TASK_TIMEOUT_MS;
+        }
+
+        void cancel() {
+            cancelled.set(true);
+        }
+
+        boolean isCancelled() {
+            return cancelled.get();
+        }
+    }
 
     @Autowired
     @Qualifier("customThreadPool")
@@ -140,23 +161,36 @@ public class SendMsg {
         Map<String, FsCoursePlaySourceConfig> miniMap = getMiniMap();
         // 获取 pad 发送的企微
         getQwUserList().forEach(e -> {
-            // 如果没有值就执行后面的方法 并且入值
-            qwMap.computeIfAbsent(e.getId(), k -> {
-                // 线程启动
-                CompletableFuture.runAsync(() -> {
-                    try {
-                        log.info("开始任务:{}", e.getQwUserName());
-                        // 开始任务
-                        processUser(e, delayStart, delayEnd, miniMap);
-                    } catch (Exception exception) {
-                        log.error("发送错误:", exception);
-                    } finally {
-                        log.info("删除任务:{}", e.getQwUserName());
-                        qwMap.remove(e.getId());
-//                        removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
-                    }
-                }, customThreadPool);
-                return System.currentTimeMillis(); // 占位值
+            TaskContext ctx = qwMap.get(e.getId());
+            if (ctx != null) {
+                if (ctx.isTimeout()) {
+                    log.warn("任务超时,标记取消:{}, 已运行: {}ms", e.getQwUserName(), System.currentTimeMillis() - ctx.startTime);
+                    ctx.cancel();
+                } else {
+                    log.debug("任务正在执行中,跳过:{}", e.getQwUserName());
+                    return;
+                }
+            }
+            if (customThreadPool.getActiveCount() >= customThreadPool.getMaxPoolSize()) {
+                log.warn("线程池已满,跳过任务:{}, 活跃线程: {}/{}", e.getQwUserName(), customThreadPool.getActiveCount(), customThreadPool.getMaxPoolSize());
+                return;
+            }
+            TaskContext newCtx = new TaskContext();
+            qwMap.put(e.getId(), newCtx);
+            CompletableFuture.runAsync(() -> {
+                try {
+                    log.info("开始任务:{}", e.getQwUserName());
+                    processUser(e, delayStart, delayEnd, miniMap, newCtx);
+                } catch (Exception exception) {
+                    log.error("发送错误:", exception);
+                } finally {
+                    log.info("删除任务:{}", e.getQwUserName());
+                    qwMap.remove(e.getId());
+                }
+            }, customThreadPool).exceptionally(ex -> {
+                log.error("任务提交失败:{}, 错误: {}", e.getQwUserName(), ex.getMessage());
+                qwMap.remove(e.getId());
+                return null;
             });
         });
     }
@@ -167,8 +201,9 @@ public class SendMsg {
      * @param delayStart 随机延迟 最小值
      * @param delayEnd   随机延迟 最大值
      * @param miniMap    小程序配置
+     * @param ctx        任务上下文(用于取消检查)
      */
-    private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, FsCoursePlaySourceConfig> miniMap) {
+    private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, FsCoursePlaySourceConfig> miniMap, TaskContext ctx) {
         long start1 = System.currentTimeMillis();
         // 获取当前企微待发送记录
         List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
@@ -181,15 +216,25 @@ public class SendMsg {
         BaseVo parentVo = new BaseVo();
         parentVo.setCorpCode(qwUser.getCorpId());
         long end1 = System.currentTimeMillis();
+        // 检查是否被取消
+        if (ctx.isCancelled()) {
+            log.info("任务被取消,退出:{}", qwUser.getQwUserName());
+            return;
+        }
         // 判断这个企微是否需要发送
         if (!sendServer.isSend(user, parentVo)) {
             log.info("当前这个企微不需要发送 数据{}",user);
             return;
         }
-        log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
+        log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, ctx.startTime);
         long start3 = System.currentTimeMillis();
         // 循环代发送消息
         for (QwSopLogs qwSopLogs : qwSopLogList) {
+            // 检查是否被取消
+            if (ctx.isCancelled()) {
+                log.info("任务被取消,中断发送:{}, 已发送部分消息", qwUser.getQwUserName());
+                return;
+            }
             long start2 = System.currentTimeMillis();
             QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
             //直播的sendType:20单独走判断 其他的走以前的逻辑
@@ -227,6 +272,11 @@ public class SendMsg {
             Map<Integer, List<QwPushCount>> pushMap = pushCountList.stream().collect(Collectors.groupingBy(QwPushCount::getType));
             // 循环发送消息里面的每一条消息
             for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
+                // 检查是否被取消
+                if (ctx.isCancelled()) {
+                    log.info("任务被取消,中断发送:{}", qwUser.getQwUserName());
+                    return;
+                }
                 long start4 = System.currentTimeMillis();
                 //判断当前销售推送客户消息限制
                 Long qwUserId = qwUser.getId();//销售的Id
@@ -278,11 +328,16 @@ public class SendMsg {
                         return;
                     }
                     try {
+                        if (ctx.isCancelled()) {
+                            return;
+                        }
                         int delay = ThreadLocalRandom.current().nextInt(300, 1000);
                         log.debug("pad发送消息等待:{}ms", delay);
                         Thread.sleep(delay);
                     } catch (InterruptedException e) {
                         log.error("线程等待错误!");
+                        Thread.currentThread().interrupt();
+                        return;
                     }
                 }
             }
@@ -387,11 +442,16 @@ public class SendMsg {
             int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
             log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
             try {
+                if (ctx.isCancelled()) {
+                    return;
+                }
                 int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
                 log.debug("企微发送消息等待:{}ms", delay);
                 Thread.sleep(delay);
             } catch (InterruptedException e) {
                 log.error("线程等待错误!");
+                Thread.currentThread().interrupt();
+                return;
             }
         }
         long end3 = System.currentTimeMillis();

+ 104 - 0
fs-service/src/main/java/com/fs/crm/service/ICrmCustomerPropertyService.java

@@ -6,33 +6,137 @@ import com.fs.crm.domain.CrmCustomerProperty;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * 客户属性服务接口
+ * @author 
+ * @date 
+ */
 public interface ICrmCustomerPropertyService extends IService<CrmCustomerProperty> {
 
+    /**
+     * 根据ID查询客户属性
+     * @param id 客户属性ID
+     * @return 客户属性对象
+     */
     CrmCustomerProperty selectCrmCustomerPropertyById(Long id);
 
+    /**
+     * 查询客户属性列表
+     * @param crmCustomerProperty 客户属性查询条件
+     * @return 客户属性列表
+     */
     List<CrmCustomerProperty> selectCrmCustomerPropertyList(CrmCustomerProperty crmCustomerProperty);
 
+    /**
+     * 新增客户属性
+     * @param crmCustomerProperty 客户属性信息
+     * @return 新增结果(1-成功,0-失败)
+     */
     int insertCrmCustomerProperty(CrmCustomerProperty crmCustomerProperty);
 
+    /**
+     * 修改客户属性
+     * @param crmCustomerProperty 客户属性信息
+     * @return 修改结果(1-成功,0-失败)
+     */
     int updateCrmCustomerProperty(CrmCustomerProperty crmCustomerProperty);
 
+    /**
+     * 批量删除客户属性
+     * @param ids 客户属性ID数组
+     * @return 删除结果(删除的记录数)
+     */
     int deleteCrmCustomerPropertyByIds(Long[] ids);
 
+    /**
+     * 根据ID删除客户属性
+     * @param id 客户属性ID
+     * @return 删除结果(1-成功,0-失败)
+     */
     int deleteCrmCustomerPropertyById(Long id);
 
+    /**
+     * 根据客户ID查询客户属性列表
+     * @param customerId 客户ID
+     * @return 客户属性列表
+     */
     List<CrmCustomerProperty> selectCrmCustomerPropertyByCustomerId(Long customerId);
 
+    /**
+     * 添加客户属性
+     * @param customerId 客户ID
+     * @param propertyId 属性ID
+     * @param propertyName 属性名称
+     * @param propertyValue 属性值
+     * @param propertyValueType 属性值类型
+     * @param tradeType 交易类型
+     * @param createBy 创建人
+     * @return 添加结果(1-成功,0-失败)
+     */
     int addCustomerProperty(Long customerId, Long propertyId, String propertyName, String propertyValue, String propertyValueType, String tradeType, String createBy);
 
+    /**
+     * 添加或更新客户属性
+     * @param customerId 客户ID
+     * @param propertyId 属性ID
+     * @param propertyName 属性名称
+     * @param propertyValue 属性值
+     * @param propertyValueType 属性值类型
+     * @param tradeType 交易类型
+     * @param createBy 创建人
+     * @return 操作结果(1-成功,0-失败)
+     */
     int addOrUpdateCustomerProperty(Long customerId, Long propertyId, String propertyName, String propertyValue, String propertyValueType, String tradeType, String createBy);
 
+    /**
+     * 添加或更新客户属性(带额外信息)
+     * @param customerId 客户ID
+     * @param propertyId 属性ID
+     * @param propertyName 属性名称
+     * @param propertyValue 属性值
+     * @param propertyValueType 属性值类型
+     * @param tradeType 交易类型
+     * @param intention 意向
+     * @param likeRatio 喜欢比例
+     * @param createBy 创建人
+     * @return 操作结果(1-成功,0-失败)
+     */
     int addOrUpdateCustomerPropertyWithExtra(Long customerId, Long propertyId, String propertyName, String propertyValue, String propertyValueType, String tradeType, String intention, Integer likeRatio, String createBy);
 
+    /**
+     * 批量添加客户属性
+     * @param customerId 客户ID
+     * @param properties 客户属性列表
+     * @return 添加结果(添加的记录数)
+     */
     int batchAddCustomerProperties(Long customerId, List<CrmCustomerProperty> properties);
 
+    /**
+     * 根据属性模板ID添加客户属性
+     * @param customerId 客户ID
+     * @param propertyTemplateId 属性模板ID
+     * @param propertyValue 属性值
+     * @param createBy 创建人
+     * @return 添加结果(1-成功,0-失败)
+     */
     int addPropertyByTemplateId(Long customerId, Long propertyTemplateId, String propertyValue, String createBy);
 
+    /**
+     * 根据属性模板ID添加或更新客户属性
+     * @param customerId 客户ID
+     * @param propertyTemplateId 属性模板ID
+     * @param propertyValue 属性值
+     * @param createBy 创建人
+     * @return 操作结果(1-成功,0-失败)
+     */
     int addOrUpdatePropertyByTemplateId(Long customerId, Long propertyTemplateId, String propertyValue, String createBy);
 
+    /**
+     * 批量根据属性模板ID添加客户属性
+     * @param customerId 客户ID
+     * @param propertyMap 属性模板ID和属性值的映射
+     * @param createBy 创建人
+     * @return 添加结果(添加的记录数)
+     */
     int batchAddPropertiesByTemplateIds(Long customerId, Map<Long, String> propertyMap, String createBy);
 }