Parcourir la source

Merge remote-tracking branch 'origin/bjcz_his_scrm' into bjcz_his_scrm

xw il y a 1 jour
Parent
commit
c7f35ae682

+ 74 - 2
fs-qw-task/src/main/java/com/fs/app/taskService/impl/SopLogsTaskServiceImpl.java

@@ -49,6 +49,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DuplicateKeyException;
 import org.springframework.retry.annotation.Backoff;
 import org.springframework.retry.annotation.Retryable;
 import org.springframework.scheduling.annotation.Async;
@@ -68,6 +69,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -2099,6 +2101,65 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         }
     }
 
+    /**
+     * 排查 fs_course_link 重复键:入队时打印 link + 业务指纹,便于与「批量入库前/失败」日志按 link、时间对齐。
+     */
+    private void logFsCourseLinkEnqueue(FsCourseLink courseLink) {
+        if (courseLink == null) {
+            return;
+        }
+        log.info("[FsCourseLink排查][入队] link={} linkType={} companyId={} companyUserId={} qwUserId={} videoId={} courseId={} qwExternalId={} corpId={} createTime={} thread={}",
+                courseLink.getLink(),
+                courseLink.getLinkType(),
+                courseLink.getCompanyId(),
+                courseLink.getCompanyUserId(),
+                courseLink.getQwUserId(),
+                courseLink.getVideoId(),
+                courseLink.getCourseId(),
+                courseLink.getQwExternalId(),
+                courseLink.getCorpId(),
+                courseLink.getCreateTime(),
+                Thread.currentThread().getName());
+    }
+
+    /**
+     * 批量入库前:区分「本批多条相同 link」与「link 唯一但可能撞库已存在」。
+     */
+    private void logFsCourseLinkBatchBeforeInsert(String diagId, List<FsCourseLink> batch) {
+        if (CollectionUtils.isEmpty(batch)) {
+            return;
+        }
+        int size = batch.size();
+        Map<String, Long> linkFreq = batch.stream()
+                .map(FsCourseLink::getLink)
+                .filter(Objects::nonNull)
+                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+        int distinctLinks = linkFreq.size();
+        List<String> intraDup = linkFreq.entrySet().stream()
+                .filter(e -> e.getValue() > 1)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList());
+        int sampleN = Math.min(15, size);
+        List<String> fingerprints = new ArrayList<>(sampleN);
+        for (int i = 0; i < sampleN; i++) {
+            FsCourseLink cl = batch.get(i);
+            fingerprints.add(String.format("link=%s|ext=%s|vid=%s|course=%s|cu=%s",
+                    cl.getLink(),
+                    cl.getQwExternalId(),
+                    cl.getVideoId(),
+                    cl.getCourseId(),
+                    cl.getCompanyUserId()));
+        }
+        log.info("[FsCourseLink排查][批量入库前] diagId={} batchSize={} distinctLinkCount={} intraBatchDuplicateLinks={} thread={} fingerprintSampleSize={} fingerprints={}",
+                diagId,
+                size,
+                distinctLinks,
+                intraDup,
+                Thread.currentThread().getName(),
+                sampleN,
+                String.join(" || ", fingerprints));
+    }
+
     /**
      * 将 FsCourseWatchLog 放入队列
      */
@@ -2108,6 +2169,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
             if (!offered) {
                 log.error("FsCourseLink 队列已满,无法添加日志: {}", JSON.toJSONString(courseLink));
                 // 处理队列已满的情况,例如记录到失败队列或持久化存储
+            } else {
+                logFsCourseLinkEnqueue(courseLink);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -2325,11 +2388,20 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
             backoff = @Backoff(delay = 2000)
     )
     public void batchInsertFsCourseLink(List<FsCourseLink> courseLinkToInsert) {
+        String diagId = UUID.randomUUID().toString().replace("-", "").substring(0, 12);
         try {
+            logFsCourseLinkBatchBeforeInsert(diagId, courseLinkToInsert);
             fsCourseLinkMapper.insertFsCourseLinkBatch(courseLinkToInsert);
-            log.info("批量插入 FsCourseLink 完成,共插入 {} 条记录。", courseLinkToInsert.size());
+            log.info("批量插入 FsCourseLink 完成,共插入 {} 条记录。[FsCourseLink排查] diagId={}", courseLinkToInsert.size(), diagId);
+        } catch (DuplicateKeyException e) {
+            Throwable root = e.getMostSpecificCause() != null ? e.getMostSpecificCause() : e;
+            log.error("[FsCourseLink排查][DuplicateKey] diagId={} batchSize={} rootMsg={} 解读: distinctLinkCount=batchSize且intraBatchDuplicateLinks为空时多为link已在库中(重复入队/MQ重投/任务重跑); 若 intraBatchDuplicateLinks 非空则为同一批内多条相同link。请在同一时间点检索 [入队] 与本文 diagId 关联。",
+                    diagId,
+                    courseLinkToInsert == null ? 0 : courseLinkToInsert.size(),
+                    root.getMessage(),
+                    e);
         } catch (Exception e) {
-            log.error("批量插入 FsCourseLink 失败: {}", e.getMessage(), e);
+            log.error("批量插入 FsCourseLink 失败: {} [FsCourseLink排查] diagId={}", e.getMessage(), diagId, e);
             // 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
         }
     }