|
|
@@ -0,0 +1,161 @@
|
|
|
+package com.fs.task;
|
|
|
+
|
|
|
+import com.fs.crm.domain.CrmCustomerAnalyze;
|
|
|
+import com.fs.crm.service.ICrmCustomerAnalyzeService;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+@Component("CrmCustomerAiProcessingTask")
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Slf4j
|
|
|
+public class CrmCustomerAiProcessingTask {
|
|
|
+
|
|
|
+ private final RedisTemplate redisTemplate;
|
|
|
+
|
|
|
+ private static final String CRM_AI_REDIS_KEY = "crm:AI:data:processing";
|
|
|
+
|
|
|
+ private final ICrmCustomerAnalyzeService crmCustomerAnalyzeService;
|
|
|
+
|
|
|
+ // 自定义线程池
|
|
|
+ private final ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
+ 5, // 核心线程数
|
|
|
+ 10, // 最大线程数
|
|
|
+ 60, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(200),
|
|
|
+ r -> {
|
|
|
+ Thread thread = new Thread(r);
|
|
|
+ thread.setName("crm-ai-processor-" + thread.getId());
|
|
|
+ return thread;
|
|
|
+ },
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用线程处理
|
|
|
+ );
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void process() {
|
|
|
+// 一次只处理5条,AI响应慢避免阻塞
|
|
|
+ List<Map<String, Object>> range =
|
|
|
+ (List<Map<String, Object>>) redisTemplate.opsForList().range(CRM_AI_REDIS_KEY, 0, 4);
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
+ log.info("CrmCustomerAiProcessingTask没有待处理的数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ final int total = range.size();
|
|
|
+ log.info("CrmCustomerAiProcessingTask开始处理数据, 条数: {}", total);
|
|
|
+
|
|
|
+
|
|
|
+ AtomicInteger successCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ CompletableFuture<Void> futures = CompletableFuture.runAsync(
|
|
|
+ () -> processBatch(range, successCount, failCount), executorService);
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures).join();
|
|
|
+ } catch (CompletionException e) {
|
|
|
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
|
|
|
+ log.error("多线程处理异常", cause);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long costTime = System.currentTimeMillis() - startTime;
|
|
|
+ log.info("CrmCustomerAiProcessingTask处理完成, 总条数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
|
|
|
+ total, successCount.get(), failCount.get(), costTime);
|
|
|
+
|
|
|
+ // 当前 processBatch 内任一条失败会抛异常导致 join 失败;能走到此处说明整批成功
|
|
|
+ if (failCount.get() == 0 && successCount.get() == total) {
|
|
|
+ redisTemplate.delete(CRM_AI_REDIS_KEY);
|
|
|
+ log.info("全部处理成功,已删除Redis数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.warn("计数与预期不一致: 总条数={}, 成功={}, 失败={}, 未删除 Redis 队列", total,
|
|
|
+ successCount.get(), failCount.get());
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 处理单个批次
|
|
|
+ */
|
|
|
+ private void processBatch(List<Map<String, Object>> batch,
|
|
|
+ AtomicInteger successCount,
|
|
|
+ AtomicInteger failCount) {
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ long batchStartTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("线程 {} 开始处理批次, 数据量: {}", threadName, batch.size());
|
|
|
+
|
|
|
+ for (Map<String, Object> data : batch) {
|
|
|
+ processSingleCustomer(data, successCount, failCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ long costTime = System.currentTimeMillis() - batchStartTime;
|
|
|
+ log.info("线程 {} 批次处理完成, 数据量: {}, 耗时: {}ms",
|
|
|
+ threadName, batch.size(), costTime);
|
|
|
+ } catch (Exception e) {
|
|
|
+ failCount.addAndGet(batch.size());
|
|
|
+ log.error("线程 {} 批次处理失败, 数据量: {}", threadName, batch.size(), e);
|
|
|
+ throw new RuntimeException("批次处理失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 处理单个客户的AI分析(6个接口并行)
|
|
|
+ */
|
|
|
+ private void processSingleCustomer(Map<String, Object> data,
|
|
|
+ AtomicInteger successCount,
|
|
|
+ AtomicInteger failCount) {
|
|
|
+ try {
|
|
|
+ Long customerId = (Long)data.get("customerId");
|
|
|
+ String dataJson = (String)data.get("data");
|
|
|
+ Long logId = (Long)data.get("logId");
|
|
|
+
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 6 个 AI 接口并行;使用 commonPool,避免与批次线程池 executorService 嵌套导致死锁
|
|
|
+ Executor asyncPool = ForkJoinPool.commonPool();
|
|
|
+ // 使用 supplyAsync 获取返回值,定义具体返回类型
|
|
|
+ CompletableFuture<String> portraitFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiGeneratedCustomerPortrait(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+ CompletableFuture<String> summaryFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiCommunicationSummary(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+ CompletableFuture<String> abstractFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiCommunicationAbstract(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+ CompletableFuture<Long> attritionFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiAttritionLevel(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+ CompletableFuture<String> focusFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiCustomerFocus(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+ CompletableFuture<String> intentionFuture = CompletableFuture.supplyAsync(() ->
|
|
|
+ crmCustomerAnalyzeService.aiIntentionDegree(customerId, dataJson, logId), asyncPool);
|
|
|
+
|
|
|
+// 等待所有异步任务完成
|
|
|
+ CompletableFuture.allOf(portraitFuture, summaryFuture, abstractFuture,
|
|
|
+ attritionFuture, focusFuture, intentionFuture).join();
|
|
|
+ // allAiFutures.get(60, TimeUnit.SECONDS);
|
|
|
+ CrmCustomerAnalyze crmCustomerAnalyze = new CrmCustomerAnalyze();
|
|
|
+ crmCustomerAnalyze.setCustomerId(customerId);
|
|
|
+ crmCustomerAnalyze.setCustomerPortraitJson(portraitFuture.get());
|
|
|
+ crmCustomerAnalyze.setCommunicationSummary(summaryFuture.get());
|
|
|
+ crmCustomerAnalyze.setCommunicationAbstract(abstractFuture.get());
|
|
|
+ crmCustomerAnalyze.setAttritionLevel(attritionFuture.get());
|
|
|
+ crmCustomerAnalyze.setCustomerFocusJson(focusFuture.get());
|
|
|
+ crmCustomerAnalyze.setIntentionDegree(intentionFuture.get());
|
|
|
+ Integer i = crmCustomerAnalyzeService.updateCrmCustomerAnalyzeByCustomerId(crmCustomerAnalyze);
|
|
|
+ long costTime = System.currentTimeMillis() - startTime;
|
|
|
+ successCount.incrementAndGet();
|
|
|
+ log.info("客户 {} 的AI分析完成, 耗时: {}ms,更新{}条", customerId, costTime,i);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ failCount.incrementAndGet();
|
|
|
+ log.error("处理客户数据失败, customerId: {}, logId: {}",
|
|
|
+ data.get("customerId"), data.get("logId"), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|