|
|
@@ -0,0 +1,237 @@
|
|
|
+package com.fs.his.task;
|
|
|
+
|
|
|
+import com.fs.qw.domain.QuickTagTask;
|
|
|
+import com.fs.qw.domain.QuickTagTaskLog;
|
|
|
+import com.fs.qw.domain.QwExternalContact;
|
|
|
+import com.fs.qw.mapper.QuickTagTaskLogMapper;
|
|
|
+import com.fs.qw.mapper.QuickTagTaskMapper;
|
|
|
+import com.fs.qw.mapper.QwExternalContactMapper;
|
|
|
+import com.fs.qwApi.domain.QwResult;
|
|
|
+import com.fs.qwApi.param.QwEditUserTagParam;
|
|
|
+import com.fs.qwApi.service.QwApiService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.commons.lang3.time.DateUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 简洁版进粉打标签相关定时任务
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component("TagTask")
|
|
|
+public class TagTask {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QuickTagTaskMapper quickTagTaskMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QuickTagTaskLogMapper quickTagTaskLogMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QwExternalContactMapper qwExternalContactMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QwApiService qwApiService;
|
|
|
+
|
|
|
+ // 线程池配置
|
|
|
+ private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
|
|
|
+ 10, 20, 60L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(1000),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+
|
|
|
+ // 并行处理阈值:超过此数量才启用多线程
|
|
|
+ private static final int PARALLEL_THRESHOLD = 500;
|
|
|
+
|
|
|
+ // 最大并发数(控制API调用并发)
|
|
|
+ private static final int MAX_CONCURRENT = 10;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 简洁版自动打标签定时任务
|
|
|
+ */
|
|
|
+ public void quickAddTag() {
|
|
|
+ log.info("========== 简洁版自动打标签任务开始 ==========");
|
|
|
+ QuickTagTask query = new QuickTagTask();
|
|
|
+ List<QuickTagTask> taskList = quickTagTaskMapper.selectQuickTagTaskList(query);
|
|
|
+ if (CollectionUtils.isEmpty(taskList)) {
|
|
|
+ log.info("无自动打标签任务,结束");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (QuickTagTask task : taskList) {
|
|
|
+ try {
|
|
|
+ executeSingleTask(task);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("执行任务失败,任务ID: {}, 任务详情: {}", task.getId(), task, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("========== 简洁版自动打标签任务结束 ==========");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行单个打标签任务
|
|
|
+ */
|
|
|
+ private void executeSingleTask(QuickTagTask task) {
|
|
|
+ log.info("开始执行任务 ID: {}, corpId: {}", task.getId(), task.getCorpId());
|
|
|
+
|
|
|
+ // 1. 校验必要参数
|
|
|
+ Integer executionDays = task.getExecutionDays();
|
|
|
+ if (executionDays == null || executionDays < 0) {
|
|
|
+ log.warn("任务 {} 执行天数为空或无效,跳过", task.getId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String tagIdStr = task.getTagId();
|
|
|
+ if (StringUtils.isEmpty(tagIdStr)) {
|
|
|
+ log.warn("任务 {} 未配置标签,跳过", task.getId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<String> tagIds = Arrays.stream(tagIdStr.split(","))
|
|
|
+ .filter(StringUtils::isNotBlank)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if (tagIds.isEmpty()) {
|
|
|
+ log.warn("任务 {} 标签列表为空,跳过", task.getId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 计算触发日期范围
|
|
|
+ Date zeroToday = DateUtils.truncate(new Date(), Calendar.DAY_OF_MONTH);
|
|
|
+ Date targetDate = DateUtils.addDays(zeroToday, -executionDays);
|
|
|
+ Date startTime = targetDate;
|
|
|
+ Date endTime = DateUtils.addDays(targetDate, 1);
|
|
|
+
|
|
|
+ // 3. 查询客户列表
|
|
|
+ List<QwExternalContact> contactList = qwExternalContactMapper.selectQwExternalContactListByCreateTime(
|
|
|
+ task.getCorpId(), startTime, endTime);
|
|
|
+ if (CollectionUtils.isEmpty(contactList)) {
|
|
|
+ log.info("任务 {} 在 {} 至 {} 期间无新增客户", task.getId(), startTime, endTime);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("任务 {} 命中 {} 个客户", task.getId(), contactList.size());
|
|
|
+
|
|
|
+ // 4. 根据数据量决定串行或并行处理
|
|
|
+ if (contactList.size() <= PARALLEL_THRESHOLD) {
|
|
|
+ processSequentially(task, tagIds, contactList);
|
|
|
+ } else {
|
|
|
+ processConcurrently(task, tagIds, contactList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 串行处理
|
|
|
+ */
|
|
|
+ private void processSequentially(QuickTagTask task, List<String> tagIds, List<QwExternalContact> contactList) {
|
|
|
+ log.info("任务 {} 使用串行处理", task.getId());
|
|
|
+ int successCount = 0;
|
|
|
+ int failCount = 0;
|
|
|
+
|
|
|
+ for (QwExternalContact contact : contactList) {
|
|
|
+ boolean success = processSingleContact(task, tagIds, contact);
|
|
|
+ if (success) {
|
|
|
+ successCount++;
|
|
|
+ } else {
|
|
|
+ failCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("任务 {} 串行处理完成,成功: {}, 失败: {}", task.getId(), successCount, failCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 并行处理
|
|
|
+ */
|
|
|
+ private void processConcurrently(QuickTagTask task, List<String> tagIds, List<QwExternalContact> contactList) {
|
|
|
+ log.info("任务 {} 使用并行处理,并发数: {}", task.getId(), MAX_CONCURRENT);
|
|
|
+ Semaphore semaphore = new Semaphore(MAX_CONCURRENT);
|
|
|
+ AtomicInteger successCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
+
|
|
|
+ List<CompletableFuture<Void>> futures = new ArrayList<>(contactList.size());
|
|
|
+
|
|
|
+ for (QwExternalContact contact : contactList) {
|
|
|
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ semaphore.acquire();
|
|
|
+ boolean success = processSingleContact(task, tagIds, contact);
|
|
|
+ if (success) {
|
|
|
+ successCount.incrementAndGet();
|
|
|
+ } else {
|
|
|
+ failCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("线程被中断,客户: {}", contact.getExternalUserId(), e);
|
|
|
+ } finally {
|
|
|
+ semaphore.release();
|
|
|
+ }
|
|
|
+ }, EXECUTOR);
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 等待所有任务完成(设置超时)
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
|
+ .get(30, TimeUnit.MINUTES);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("并行处理任务超时或异常", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("任务 {} 并行处理完成,成功: {}, 失败: {}", task.getId(), successCount.get(), failCount.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理单个客户(打标签 + 记录日志)
|
|
|
+ *
|
|
|
+ * @return true-成功,false-失败
|
|
|
+ */
|
|
|
+ private boolean processSingleContact(QuickTagTask task, List<String> tagIds, QwExternalContact contact) {
|
|
|
+ boolean apiSuccess = false;
|
|
|
+ String errorMsg = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ QwEditUserTagParam param = new QwEditUserTagParam();
|
|
|
+ param.setUserid(contact.getUserId());
|
|
|
+ param.setExternal_userid(contact.getExternalUserId());
|
|
|
+ param.setAdd_tag(tagIds);
|
|
|
+
|
|
|
+ QwResult result = qwApiService.editUserTag(param, task.getCorpId());
|
|
|
+ if (result != null && result.getErrcode() == 0) {
|
|
|
+ apiSuccess = true;
|
|
|
+ } else {
|
|
|
+ errorMsg = result != null ? result.getErrmsg() : "API返回空";
|
|
|
+ log.warn("客户 {} 打标签失败: errcode={}, errmsg={}",
|
|
|
+ contact.getExternalUserId(),
|
|
|
+ result != null ? result.getErrcode() : "null",
|
|
|
+ errorMsg);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ errorMsg = e.getMessage();
|
|
|
+ log.error("客户 {} 打标签发生异常", contact.getExternalUserId(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 记录日志
|
|
|
+ try {
|
|
|
+ QuickTagTaskLog logEntry = new QuickTagTaskLog();
|
|
|
+ logEntry.setAutoTagId(task.getId());
|
|
|
+ logEntry.setType(1);
|
|
|
+ logEntry.setQwUserid(contact.getQwUserId());
|
|
|
+ logEntry.setExternalUserId(contact.getExternalUserId());
|
|
|
+ logEntry.setAddTime(com.fs.common.utils.DateUtils.getNowDate());
|
|
|
+ logEntry.setCompanyId(contact.getCompanyId());
|
|
|
+ logEntry.setCorpId(task.getCorpId());
|
|
|
+ logEntry.setStatus(apiSuccess ? 1 : 0);
|
|
|
+ logEntry.setErrorMsg(apiSuccess ? null : errorMsg);
|
|
|
+ quickTagTaskLogMapper.insertQuickTagTaskLog(logEntry);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("插入打标签日志失败,客户: {}, 任务: {}", contact.getExternalUserId(), task.getId(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ return apiSuccess;
|
|
|
+ }
|
|
|
+}
|