|
|
@@ -49,6 +49,7 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.springframework.beans.BeanUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.dao.DuplicateKeyException;
|
|
|
import org.springframework.retry.annotation.Backoff;
|
|
|
import org.springframework.retry.annotation.Retryable;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
@@ -68,6 +69,7 @@ import java.time.temporal.ChronoUnit;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
@@ -2099,6 +2101,65 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 排查 fs_course_link 重复键:入队时打印 link + 业务指纹,便于与「批量入库前/失败」日志按 link、时间对齐。
|
|
|
+ */
|
|
|
+ private void logFsCourseLinkEnqueue(FsCourseLink courseLink) {
|
|
|
+ if (courseLink == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("[FsCourseLink排查][入队] link={} linkType={} companyId={} companyUserId={} qwUserId={} videoId={} courseId={} qwExternalId={} corpId={} createTime={} thread={}",
|
|
|
+ courseLink.getLink(),
|
|
|
+ courseLink.getLinkType(),
|
|
|
+ courseLink.getCompanyId(),
|
|
|
+ courseLink.getCompanyUserId(),
|
|
|
+ courseLink.getQwUserId(),
|
|
|
+ courseLink.getVideoId(),
|
|
|
+ courseLink.getCourseId(),
|
|
|
+ courseLink.getQwExternalId(),
|
|
|
+ courseLink.getCorpId(),
|
|
|
+ courseLink.getCreateTime(),
|
|
|
+ Thread.currentThread().getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量入库前:区分「本批多条相同 link」与「link 唯一但可能撞库已存在」。
|
|
|
+ */
|
|
|
+ private void logFsCourseLinkBatchBeforeInsert(String diagId, List<FsCourseLink> batch) {
|
|
|
+ if (CollectionUtils.isEmpty(batch)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int size = batch.size();
|
|
|
+ Map<String, Long> linkFreq = batch.stream()
|
|
|
+ .map(FsCourseLink::getLink)
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
|
|
+ int distinctLinks = linkFreq.size();
|
|
|
+ List<String> intraDup = linkFreq.entrySet().stream()
|
|
|
+ .filter(e -> e.getValue() > 1)
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ int sampleN = Math.min(15, size);
|
|
|
+ List<String> fingerprints = new ArrayList<>(sampleN);
|
|
|
+ for (int i = 0; i < sampleN; i++) {
|
|
|
+ FsCourseLink cl = batch.get(i);
|
|
|
+ fingerprints.add(String.format("link=%s|ext=%s|vid=%s|course=%s|cu=%s",
|
|
|
+ cl.getLink(),
|
|
|
+ cl.getQwExternalId(),
|
|
|
+ cl.getVideoId(),
|
|
|
+ cl.getCourseId(),
|
|
|
+ cl.getCompanyUserId()));
|
|
|
+ }
|
|
|
+ log.info("[FsCourseLink排查][批量入库前] diagId={} batchSize={} distinctLinkCount={} intraBatchDuplicateLinks={} thread={} fingerprintSampleSize={} fingerprints={}",
|
|
|
+ diagId,
|
|
|
+ size,
|
|
|
+ distinctLinks,
|
|
|
+ intraDup,
|
|
|
+ Thread.currentThread().getName(),
|
|
|
+ sampleN,
|
|
|
+ String.join(" || ", fingerprints));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 将 FsCourseWatchLog 放入队列
|
|
|
*/
|
|
|
@@ -2108,6 +2169,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
if (!offered) {
|
|
|
log.error("FsCourseLink 队列已满,无法添加日志: {}", JSON.toJSONString(courseLink));
|
|
|
// 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
+ } else {
|
|
|
+ logFsCourseLinkEnqueue(courseLink);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
@@ -2325,11 +2388,20 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
backoff = @Backoff(delay = 2000)
|
|
|
)
|
|
|
public void batchInsertFsCourseLink(List<FsCourseLink> courseLinkToInsert) {
|
|
|
+ String diagId = UUID.randomUUID().toString().replace("-", "").substring(0, 12);
|
|
|
try {
|
|
|
+ logFsCourseLinkBatchBeforeInsert(diagId, courseLinkToInsert);
|
|
|
fsCourseLinkMapper.insertFsCourseLinkBatch(courseLinkToInsert);
|
|
|
- log.info("批量插入 FsCourseLink 完成,共插入 {} 条记录。", courseLinkToInsert.size());
|
|
|
+ log.info("批量插入 FsCourseLink 完成,共插入 {} 条记录。[FsCourseLink排查] diagId={}", courseLinkToInsert.size(), diagId);
|
|
|
+ } catch (DuplicateKeyException e) {
|
|
|
+ Throwable root = e.getMostSpecificCause() != null ? e.getMostSpecificCause() : e;
|
|
|
+ log.error("[FsCourseLink排查][DuplicateKey] diagId={} batchSize={} rootMsg={} 解读: distinctLinkCount=batchSize且intraBatchDuplicateLinks为空时多为link已在库中(重复入队/MQ重投/任务重跑); 若 intraBatchDuplicateLinks 非空则为同一批内多条相同link。请在同一时间点检索 [入队] 与本文 diagId 关联。",
|
|
|
+ diagId,
|
|
|
+ courseLinkToInsert == null ? 0 : courseLinkToInsert.size(),
|
|
|
+ root.getMessage(),
|
|
|
+ e);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("批量插入 FsCourseLink 失败: {}", e.getMessage(), e);
|
|
|
+ log.error("批量插入 FsCourseLink 失败: {} [FsCourseLink排查] diagId={}", e.getMessage(), diagId, e);
|
|
|
// 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
|
|
|
}
|
|
|
}
|