|
|
@@ -0,0 +1,148 @@
|
|
|
+package com.fs.task;
|
|
|
+
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Component("CrmCustomerAiProcessingTask")
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Slf4j
|
|
|
+public class CrmCustomerAiProcessingTask {
|
|
|
+
|
|
|
+ private final RedisTemplate redisTemplate;
|
|
|
+
|
|
|
+ private static final String CRM_AI_REDIS_KEY = "crm:AI:data:processing";
|
|
|
+
|
|
|
+ // 自定义线程池
|
|
|
+ private final ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
+ 5, // 核心线程数
|
|
|
+ 10, // 最大线程数
|
|
|
+ 60, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(200),
|
|
|
+ r -> {
|
|
|
+ Thread thread = new Thread(r);
|
|
|
+ thread.setName("crm-ai-processor-" + thread.getId());
|
|
|
+ return thread;
|
|
|
+ },
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用线程处理
|
|
|
+ );
|
|
|
+
|
|
|
+ public void process() {
|
|
|
+ List<Map<String,String>> range = (List<Map<String, String>>) redisTemplate.opsForList().range(CRM_AI_REDIS_KEY, 0, -1);
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
+ log.info("CrmCustomerAiProcessingTask没有待处理的数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("CrmCustomerAiProcessingTask开始处理数据,条数"+range.size());
|
|
|
+ // 2. 每100条分成一批
|
|
|
+ List<List<Map<String, String>>> partitions = Lists.partition(range, 10);//ai沟通很慢,批量处理10条每批
|
|
|
+ int totalBatches = partitions.size();
|
|
|
+ log.info("共分为 {} 批, 每批"+(partitions.size()>1?"10":range.size())+"条", totalBatches);
|
|
|
+
|
|
|
+ // 3. 统计计数器
|
|
|
+ AtomicInteger successCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
+
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 4. 多线程处理
|
|
|
+ List<CompletableFuture<Void>> futures = partitions.stream()
|
|
|
+ .map(batch -> CompletableFuture.runAsync(() -> {
|
|
|
+ processBatch(batch, successCount, failCount);
|
|
|
+ }, executorService))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 5. 等待所有任务完成
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
|
+
|
|
|
+ long costTime = System.currentTimeMillis() - startTime;
|
|
|
+ log.info("CrmCustomerAiProcessingTask处理完成, 总条数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
|
|
|
+ range.size(), successCount.get(), failCount.get(), costTime);
|
|
|
+
|
|
|
+ // 6. 处理完成后,从Redis中删除已处理的数据
|
|
|
+ if (failCount.get() == 0) {
|
|
|
+ // 全部成功,删除整个key
|
|
|
+ redisTemplate.delete(CRM_AI_REDIS_KEY);
|
|
|
+ log.info("全部处理成功,已删除Redis数据");
|
|
|
+ } else {
|
|
|
+ // 有失败的数据,保留或移到失败队列
|
|
|
+ handleFailedData(partitions, successCount.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("多线程处理异常", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 处理单个批次
|
|
|
+ */
|
|
|
+ private void processBatch(List<Map<String, String>> batch,
|
|
|
+ AtomicInteger successCount,
|
|
|
+ AtomicInteger failCount) {
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ long batchStartTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("线程 {} 开始处理批次, 数据量: {}", threadName, batch.size());
|
|
|
+
|
|
|
+ // 示例:处理每条数据
|
|
|
+ for (Map<String, String> data : batch) {
|
|
|
+ // 获取数据
|
|
|
+ String customerId = data.get("customerId");
|
|
|
+ String dataJson = data.get("data");
|
|
|
+ //todo 业务!!!!!!1.ai沟通总结2.流失风险等级3.沟通摘要4.客户画像8.客户关注点9.客户意向度
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 模拟业务处理
|
|
|
+ Thread.sleep(10);
|
|
|
+ }
|
|
|
+
|
|
|
+ long costTime = System.currentTimeMillis() - batchStartTime;
|
|
|
+ successCount.addAndGet(batch.size());
|
|
|
+ log.info("线程 {} 批次处理完成, 数据量: {}, 耗时: {}ms",
|
|
|
+ threadName, batch.size(), costTime);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ failCount.addAndGet(batch.size());
|
|
|
+ log.error("线程 {} 批次处理失败, 数据量: {}", threadName, batch.size(), e);
|
|
|
+ throw new RuntimeException("批次处理失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 处理失败的数据
|
|
|
+ */
|
|
|
+ private void handleFailedData(List<List<Map<String, String>>> partitions, int successCount) {
|
|
|
+ try {
|
|
|
+ // 找出未成功处理的数据
|
|
|
+ List<Map<String, String>> failedData = partitions.stream()
|
|
|
+ .flatMap(List::stream)
|
|
|
+ .skip(successCount)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (!failedData.isEmpty()) {
|
|
|
+ String failedKey = CRM_AI_REDIS_KEY + ":failed";
|
|
|
+ for (Map<String, String> data : failedData) {
|
|
|
+ redisTemplate.opsForList().rightPush(failedKey, data);
|
|
|
+ }
|
|
|
+ log.info("失败数据已移至失败队列: {}, 数量: {}", failedKey, failedData.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 清理已处理的数据(可选:根据业务需求决定是否删除)
|
|
|
+ // cleanProcessedData(partitions, successCount);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理失败数据异常", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|