Prechádzať zdrojové kódy

feat(tag): 实现完课自动打标签功能

- 新增标签更新队列表及相关Mapper接口
- 实现FsTagUpdateService接口及核心业务逻辑
- 在课程观看日志处理中增加完课记录收集与标签处理
- 添加企微标签操作相关配置与限流控制
- 支持批量处理看课中与完课标签更新任务
- 增加标签更新失败重试机制与线程池处理逻辑
- 完善视频标签ID字段并关联标签组信息
xw 2 týždňov pred
rodič
commit
e05441df52

+ 30 - 0
fs-service/src/main/java/com/fs/course/domain/FsUserCourseVideo.java

@@ -115,4 +115,34 @@ public class FsUserCourseVideo extends BaseEntity
 
     @TableField(exist = false)
     private int sorting;
+
+    /**
+     * 看课中标签ID
+     */
+    private String watchingTagId;
+    
+    /**
+     * 完课标签ID
+     */
+    private String watchedTagId;
+    
+    /**
+     * 标签组ID
+     */
+    private String tagGroupId;
+
+    /**
+     * 标签组表中的ID
+     */
+    private Long tgId;
+    
+    /**
+     * 看课标签 表中的ID
+     */
+    private Long watchingTgId;
+    
+    /**
+     * 完课标签 表中的ID
+     */
+    private Long watchedTgId;
 }

+ 21 - 0
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -50,6 +50,7 @@ import com.fs.store.service.cache.IFsUserCourseCacheService;
 import com.fs.system.mapper.SysDictDataMapper;
 import com.fs.system.service.ISysConfigService;
 import com.fs.system.vo.DictVO;
+import com.fs.tag.service.FsTagUpdateService;
 import com.hc.openapi.tool.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,6 +136,9 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
     @Autowired
     private SysDictDataMapper sysDictDataMapper;
 
+    @Autowired
+    private FsTagUpdateService fsTagUpdateService;
+
     /**
      * 查询短链课程看课记录
      *
@@ -1122,10 +1126,21 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
         // 记录总日志数量
         log.info("开始批量更新日志,总日志数量: {}", logs.size());
 
+        // 收集完课记录用于打标签
+        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
+
         // 分批处理
         for (int i = 0; i < logs.size(); i += batchSize) {
             int end = Math.min(i + batchSize, logs.size());
             List<FsCourseWatchLog> batchList = logs.subList(i, end);
+            
+            // 筛选出完课记录
+            for(FsCourseWatchLog log : batchList) {
+                if(log.getLogType() != null && log.getLogType() == 2) {
+                    finishedLogs.add(log);
+                }
+            }
+            
             // 执行批量更新
             try {
                 fsCourseWatchLogMapper.batchUpdateWatchLog(batchList);
@@ -1141,6 +1156,12 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
                 logs.size(),
                 totalCost,
                 totalCost > 0 ? String.format("%.2f", logs.size() * 1000.0 / totalCost) : "∞");
+
+        // 完课打标签
+        if(org.apache.commons.collections4.CollectionUtils.isNotEmpty(finishedLogs)){
+            log.info("开始处理完课打标签,完课记录数: {}", finishedLogs.size());
+            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
+        }
     }
 
 

+ 89 - 0
fs-service/src/main/java/com/fs/tag/domain/FsTagUpdateQueue.java

@@ -0,0 +1,89 @@
+package com.fs.tag.domain;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+import java.util.Date;
+
+/**
+ * 标签更新队列表
+ */
+@Data
+public class FsTagUpdateQueue {
+    /** id */
+    private Long id;
+
+    /** 看课日志id */
+    private Long courseLogId;
+
+    /** 是否是先导课*/
+    private Integer isFirst;
+
+    /**
+     * 课程排序
+     */
+    private Long sort;
+
+    /** 课程ID */
+    private Long courseId;
+
+    /** 标签id */
+    private String tagId;
+
+    /** 标签名称 */
+    private String tagName;
+
+    /** 操作类型(0 ADD 1 REMOVE) 默认0 */
+    private Integer operationType;
+
+    /** 视频ID */
+    private Long videoId;
+
+    /** 0未处理 1处理中 2成功 3失败 */
+    private Integer status;
+
+    /** 重试次数 */
+    private Integer retryCount;
+
+    /** 企微主体id */
+    private String corpId;
+
+    /** 企微user_id */
+    private String qwUserId;
+
+    /** 企微外部联系人id */
+    private String qwExternalContactId;
+    
+    /**
+     * 记录类型
+     * 0 正在看课
+     * 1 已完课
+     */
+    private Integer logType;
+
+    /** 失败原因 */
+    private String failMsg;
+
+    /** 请求参数 */
+    private String payload;
+
+    /** 返回结果 */
+    private String response;
+
+    /** 创建时间 */
+    private LocalDateTime createTime;
+
+    /** 更新时间 */
+    private LocalDateTime updateTime;
+
+    /** 更新人 */
+    private Long updateBy;
+
+    /** 创建人 */
+    private Long createBy;
+
+    /**
+     * 下次执行时间
+     */
+    private LocalDateTime nextExecuteTime;
+}

+ 162 - 0
fs-service/src/main/java/com/fs/tag/mapper/FsTagUpdateQueueMapper.java

@@ -0,0 +1,162 @@
+package com.fs.tag.mapper;
+
+import com.fs.tag.domain.FsTagUpdateQueue;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+/**
+ * 标签更新队列表 Mapper
+ */
+@Mapper
+public interface FsTagUpdateQueueMapper {
+
+    @Select("select * from fs_tag_update_queue where retry_count < 3 and status in (0,3) and (next_execute_time < now() or next_execute_time is null) limit 500")
+    List<FsTagUpdateQueue> selectPending();
+
+    @Select("<script>" +
+            "SELECT * FROM fs_tag_update_queue " +
+            "<where>" +
+            "<if test='id != null'> AND id = #{id} </if>" +
+            "<if test='courseLogId != null'> AND course_log_id = #{courseLogId} </if>" +
+            "<if test='courseId != null'> AND course_id = #{courseId} </if>" +
+            "<if test='tagId != null'> AND tag_id = #{tagId} </if>" +
+            "<if test='tagName != null'> AND tag_name = #{tagName} </if>" +
+            "<if test='operationType != null'> AND operation_type = #{operationType} </if>" +
+            "<if test='videoId != null'> AND video_id = #{videoId} </if>" +
+            "<if test='status != null'> AND status = #{status} </if>" +
+            "<if test='retryCount != null'> AND retry_count = #{retryCount} </if>" +
+            "<if test='corpId != null'> AND corp_id = #{corpId} </if>" +
+            "<if test='qwUserId != null'> AND qw_user_id = #{qwUserId} </if>" +
+            "</where>" +
+            "</script>")
+    List<FsTagUpdateQueue> selectByConditions(FsTagUpdateQueue condition);
+
+    @Insert("<script>" +
+            "INSERT INTO fs_tag_update_queue " +
+            "(course_log_id, course_id, tag_id, tag_name, operation_type, video_id, status, retry_count, corp_id, qw_user_id, fail_msg, payload, response, create_time, update_time, update_by, create_by,log_type) " +
+            "VALUES " +
+            "<trim prefix='(' suffix=')' suffixOverrides=','>" +
+            "<if test='courseLogId != null'>course_log_id,</if>" +
+            "<if test='courseId != null'>course_id,</if>" +
+            "<if test='tagId != null'>tag_id,</if>" +
+            "<if test='tagName != null'>tag_name,</if>" +
+            "<if test='operationType != null'>operation_type,</if>" +
+            "<if test='videoId != null'>video_id,</if>" +
+            "<if test='status != null'>status,</if>" +
+            "<if test='retryCount != null'>retry_count,</if>" +
+            "<if test='corpId != null'>corp_id,</if>" +
+            "<if test='qwUserId != null'>qw_user_id,</if>" +
+            "<if test='qwExternalContactId != null'>qw_external_contact_id,</if>" +
+            "<if test='failMsg != null'>fail_msg,</if>" +
+            "<if test='payload != null'>payload,</if>" +
+            "<if test='response != null'>response,</if>" +
+            "<if test='createTime != null'>create_time,</if>" +
+            "<if test='updateTime != null'>update_time,</if>" +
+            "<if test='updateBy != null'>update_by,</if>" +
+            "<if test='createBy != null'>create_by,</if>" +
+            "<if test='logType != null'>log_type,</if>" +
+            "</trim>" +
+            "<trim prefix='VALUES (' suffix=')' suffixOverrides=','>" +
+            "<if test='courseLogId != null'>#{courseLogId},</if>" +
+            "<if test='courseId != null'>#{courseId},</if>" +
+            "<if test='tagId != null'>#{tagId},</if>" +
+            "<if test='tagName != null'>#{tagName},</if>" +
+            "<if test='operationType != null'>#{operationType},</if>" +
+            "<if test='videoId != null'>#{videoId},</if>" +
+            "<if test='status != null'>#{status},</if>" +
+            "<if test='retryCount != null'>#{retryCount},</if>" +
+            "<if test='corpId != null'>#{corpId},</if>" +
+            "<if test='qwExternalContactId != null'>#{qwExternalContactId},</if>" +
+            "<if test='qwUserId != null'>#{qwUserId},</if>" +
+            "<if test='failMsg != null'>#{failMsg},</if>" +
+            "<if test='payload != null'>#{payload},</if>" +
+            "<if test='response != null'>#{response},</if>" +
+            "<if test='createTime != null'>#{createTime},</if>" +
+            "<if test='updateTime != null'>#{updateTime},</if>" +
+            "<if test='updateBy != null'>#{updateBy},</if>" +
+            "<if test='createBy != null'>#{createBy},</if>" +
+            "<if test='log_type != null'>#{logType},</if>" +
+            "</trim>" +
+            "</script>")
+    @Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id")
+    int insertSelective(FsTagUpdateQueue record);
+
+
+
+    @Insert("<script>" +
+            "INSERT IGNORE INTO fs_tag_update_queue (" +
+            "course_log_id, is_first, course_id, tag_id, tag_name, operation_type, video_id, status, retry_count, " +
+            "corp_id, qw_user_id, qw_external_contact_id, fail_msg, payload, response, create_time, update_time, update_by, create_by, log_type" +
+            ") VALUES " +
+            "<foreach collection='list' item='item' separator=','>" +
+            "(" +
+            "#{item.courseLogId}, #{item.isFirst}, #{item.courseId}, #{item.tagId}, #{item.tagName}, #{item.operationType}, #{item.videoId}, #{item.status}, #{item.retryCount}, " +
+            "#{item.corpId}, #{item.qwUserId}, #{item.qwExternalContactId}, #{item.failMsg}, #{item.payload}, #{item.response}, #{item.createTime}," +
+            " #{item.updateTime}, #{item.updateBy}, #{item.createBy}, #{item.logType}" +
+            ")" +
+            "</foreach>" +
+            "</script>")
+    int batchInsert(@Param("list") List<FsTagUpdateQueue> list);
+
+
+    @Update("<script>" +
+            "UPDATE fs_tag_update_queue " +
+            "<set>" +
+            "<if test='courseLogId != null'>course_log_id = #{courseLogId},</if>" +
+            "<if test='courseId != null'>course_id = #{courseId},</if>" +
+            "<if test='tagId != null'>tag_id = #{tagId},</if>" +
+            "<if test='tagName != null'>tag_name = #{tagName},</if>" +
+            "<if test='operationType != null'>operation_type = #{operationType},</if>" +
+            "<if test='videoId != null'>video_id = #{videoId},</if>" +
+            "<if test='status != null'>status = #{status},</if>" +
+            "<if test='retryCount != null'>retry_count = #{retryCount},</if>" +
+            "<if test='corpId != null'>corp_id = #{corpId},</if>" +
+            "<if test='qwUserId != null'>qw_user_id = #{qwUserId},</if>" +
+            "<if test='failMsg != null'>fail_msg = #{failMsg},</if>" +
+            "<if test='payload != null'>payload = #{payload},</if>" +
+            "<if test='response != null'>response = #{response},</if>" +
+            "<if test='createTime != null'>create_time = #{createTime},</if>" +
+            "<if test='updateTime != null'>update_time = #{updateTime},</if>" +
+            "<if test='updateBy != null'>update_by = #{updateBy},</if>" +
+            "<if test='createBy != null'>create_by = #{createBy},</if>" +
+            "<if test='logType != null'>log_type = #{logType},</if>" +
+            "</set> " +
+            "WHERE id = #{id}" +
+            "</script>")
+    int updateSelective(FsTagUpdateQueue record);
+
+
+
+    @Update("<script>" +
+            "<foreach collection='list' item='item' separator=';'>" +
+            "UPDATE fs_tag_update_queue" +
+            "<set>" +
+            "<if test='item.courseLogId != null'>course_log_id = #{item.courseLogId},</if>" +
+            "<if test='item.isFirst != null'>is_first = #{item.isFirst},</if>" +
+            "<if test='item.courseId != null'>course_id = #{item.courseId},</if>" +
+            "<if test='item.tagId != null'>tag_id = #{item.tagId},</if>" +
+            "<if test='item.tagName != null'>tag_name = #{item.tagName},</if>" +
+            "<if test='item.operationType != null'>operation_type = #{item.operationType},</if>" +
+            "<if test='item.videoId != null'>video_id = #{item.videoId},</if>" +
+            "<if test='item.status != null'>status = #{item.status},</if>" +
+            "<if test='item.retryCount != null'>retry_count = #{item.retryCount},</if>" +
+            "<if test='item.corpId != null'>corp_id = #{item.corpId},</if>" +
+            "<if test='item.qwUserId != null'>qw_user_id = #{item.qwUserId},</if>" +
+            "<if test='item.qwExternalContactId != null'>qw_external_contact_id = #{item.qwExternalContactId},</if>" +
+            "<if test='item.failMsg != null'>fail_msg = #{item.failMsg},</if>" +
+            "<if test='item.payload != null'>payload = #{item.payload},</if>" +
+            "<if test='item.response != null'>response = #{item.response},</if>" +
+            "<if test='item.createTime != null'>create_time = #{item.createTime},</if>" +
+            "<if test='item.updateTime != null'>update_time = #{item.updateTime},</if>" +
+            "<if test='item.updateBy != null'>update_by = #{item.updateBy},</if>" +
+            "<if test='item.createBy != null'>create_by = #{item.createBy},</if>" +
+            "<if test='item.logType != null'>log_type = #{item.logType},</if>" +
+            "<if test='item.nextExecuteTime != null'>next_execute_time = #{item.nextExecuteTime},</if>" +
+            "</set>" +
+            " WHERE id = #{item.id}" +
+            "</foreach>" +
+            "</script>")
+    int batchUpdateSelective(@Param("list") List<FsTagUpdateQueue> list);
+
+}

+ 29 - 0
fs-service/src/main/java/com/fs/tag/service/FsTagUpdateService.java

@@ -0,0 +1,29 @@
+package com.fs.tag.service;
+
+import com.fs.course.domain.FsCourseWatchLog;
+
+import java.util.List;
+
+/**
+ * 完课打标签服务类
+ */
+public interface FsTagUpdateService {
+
+    /**
+     * 正课正在看课
+     * @param logs 日志
+     */
+    void onCourseWatchingBatch(List<FsCourseWatchLog> logs);
+
+    /**
+     * 正课完成看课
+     * @param logs 日志
+     */
+    void onCourseWatchFinishedBatch(List<FsCourseWatchLog> logs);
+
+
+    /**
+     * 处理数据
+     */
+    void handleData();
+}

+ 257 - 0
fs-service/src/main/java/com/fs/tag/service/impl/FsTagUpdateServiceImpl.java

@@ -0,0 +1,257 @@
+package com.fs.tag.service.impl;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
+import com.fs.common.utils.StringUtils;
+import com.fs.course.domain.FsCourseWatchLog;
+import com.fs.course.domain.FsUserCourseVideo;
+import com.fs.course.mapper.FsUserCourseVideoMapper;
+import com.fs.qw.cache.IQwUserCacheService;
+import com.fs.qw.domain.QwExternalContact;
+import com.fs.qw.mapper.QwExternalContactMapper;
+import com.fs.qwApi.domain.QwResult;
+import com.fs.qwApi.param.QwEditUserTagParam;
+import com.fs.qwApi.service.QwApiService;
+import com.fs.tag.domain.FsTagUpdateQueue;
+import com.fs.tag.mapper.FsTagUpdateQueueMapper;
+import com.fs.tag.service.FsTagUpdateService;
+import com.google.common.util.concurrent.RateLimiter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Slf4j
+@Service("fsTagUpdateService")
+public class FsTagUpdateServiceImpl implements FsTagUpdateService {
+
+    @Autowired
+    private FsTagUpdateQueueMapper fsTagUpdateQueueMapper;
+    @Autowired
+    private FsUserCourseVideoMapper fsUserCourseVideoMapper;
+    @Autowired
+    private IQwUserCacheService qwUserCacheService;
+    @Autowired
+    private QwApiService qwApiService;
+    @Autowired
+    private QwExternalContactMapper qwExternalContactMapper;
+
+    @Value("${tag.thread.num:5}")
+    private Integer TAG_THREAD_NUM;
+
+    @Value("${tag.rate.limit:30}")
+    private Integer RATE_LIMIT_NUM;
+
+    private RateLimiter rateLimiter;
+
+    @Value("${qw.enableAutoTag:0}")
+    private Integer enableAutoTag;
+
+    @PostConstruct
+    public void init(){
+        this.rateLimiter = RateLimiter.create(RATE_LIMIT_NUM);
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
+    public void onCourseWatchingBatch(List<FsCourseWatchLog> logs) {
+        if(ObjectUtil.equal(enableAutoTag,0)){
+            return;
+        }
+        List<FsTagUpdateQueue> batchData = new ArrayList<>();
+        for (FsCourseWatchLog item : logs) {
+            FsTagUpdateQueue task = new FsTagUpdateQueue();
+            task.setCourseId(item.getCourseId());
+            task.setVideoId(item.getVideoId());
+            task.setCourseLogId(item.getLogId());
+            task.setLogType(0);
+            task.setOperationType(0);
+            task.setStatus(0);
+            task.setRetryCount(0);
+            task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
+            task.setQwUserId(String.valueOf(item.getQwUserId()));
+            
+            // 通过qwExternalContactId获取corpId
+            QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(item.getQwExternalContactId());
+            if(qwExternalContact != null && StringUtils.isNotNull(qwExternalContact.getCorpId())){
+                task.setCorpId(qwExternalContact.getCorpId());
+            }
+            
+            // 通过videoId查询视频信息
+            FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(task.getVideoId());
+            if(ObjectUtils.isNull(fsUserCourseVideo)) {
+                String errorMsg = String.format("该条记录 %d 找不到对应的课堂视频", task.getVideoId());
+                log.error(errorMsg);
+                task.setStatus(3);
+                task.setRetryCount(3);
+                task.setFailMsg(errorMsg);
+                batchData.add(task);
+                continue;
+            }
+            task.setTagId(fsUserCourseVideo.getWatchingTagId());
+            if(ObjectUtil.equal(fsUserCourseVideo.getIsFirst(),1)) {
+                task.setIsFirst(1);
+            } else {
+                task.setIsFirst(0);
+            }
+            batchData.add(task);
+        }
+        if(CollectionUtils.isNotEmpty(batchData)){
+            fsTagUpdateQueueMapper.batchInsert(batchData);
+        }
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
+    public void onCourseWatchFinishedBatch(List<FsCourseWatchLog> logs) {
+        if(ObjectUtil.equal(enableAutoTag,0)){
+            return;
+        }
+        List<FsTagUpdateQueue> batchData = new ArrayList<>();
+        for (FsCourseWatchLog item : logs) {
+            FsTagUpdateQueue task = new FsTagUpdateQueue();
+            task.setCourseId(item.getCourseId());
+            task.setVideoId(item.getVideoId());
+            task.setCourseLogId(item.getLogId());
+            task.setLogType(1);
+            task.setOperationType(0);
+            task.setStatus(0);
+            task.setRetryCount(0);
+            task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
+            task.setQwUserId(String.valueOf(item.getQwUserId()));
+            
+            // 通过qwExternalContactId获取corpId
+            QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(item.getQwExternalContactId());
+            if(qwExternalContact != null && StringUtils.isNotNull(qwExternalContact.getCorpId())){
+                task.setCorpId(qwExternalContact.getCorpId());
+            }
+            
+            // 通过videoId查询视频信息
+            FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(task.getVideoId());
+            if(ObjectUtils.isNull(fsUserCourseVideo)) {
+                String errorMsg = String.format("该条记录 %d 找不到对应的课堂视频", task.getVideoId());
+                log.error(errorMsg);
+                task.setStatus(3);
+                task.setRetryCount(3);
+                task.setFailMsg(errorMsg);
+                batchData.add(task);
+                continue;
+            }
+            task.setTagId(fsUserCourseVideo.getWatchedTagId());
+            if(ObjectUtil.equal(fsUserCourseVideo.getIsFirst(),1)) {
+                task.setIsFirst(1);
+            } else {
+                task.setIsFirst(0);
+            }
+            batchData.add(task);
+        }
+        if(CollectionUtils.isNotEmpty(batchData)){
+            fsTagUpdateQueueMapper.batchInsert(batchData);
+        }
+    }
+
+    @Override
+    public void handleData() {
+        List<FsTagUpdateQueue> tasks = fsTagUpdateQueueMapper.selectPending();
+        if(CollectionUtils.isEmpty(tasks)){
+            log.info("找不到可处理的任务,已跳过!");
+            return;
+        }
+        ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
+        ExecutorService executor = Executors.newFixedThreadPool(TAG_THREAD_NUM);
+        CountDownLatch latch = new CountDownLatch(tasks.size());
+        for (FsTagUpdateQueue task : tasks) {
+            executor.submit(() -> {
+                String lockKey = task.getCourseId() + "_" + task.getVideoId();
+                ReentrantLock lock = lockMap.computeIfAbsent(lockKey, k -> new ReentrantLock());
+                lock.lock();
+                try {
+                    processSingleTask(task);
+                } finally {
+                    lock.unlock();
+                    latch.countDown();
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        try {
+            executor.shutdown();
+            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+                executor.shutdownNow();
+                if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                    log.warn("线程池未能在预期时间内关闭");
+                }
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+        if(CollectionUtils.isNotEmpty(tasks)){
+            fsTagUpdateQueueMapper.batchUpdateSelective(tasks);
+        }
+    }
+
+    private void processSingleTask(FsTagUpdateQueue fsTagUpdateQueue) {
+        try {
+            if(StringUtils.isEmpty(fsTagUpdateQueue.getTagId())){
+                throw new IllegalArgumentException("标签ID为空,请先在视频配置中设置标签");
+            }
+            QwEditUserTagParam qwEditUserTagParam = new QwEditUserTagParam();
+            Long externalContactId = Long.parseLong(fsTagUpdateQueue.getQwExternalContactId());
+            QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(externalContactId);
+            if(qwExternalContact == null) {
+                throw new IllegalArgumentException(String.format("企微外部联系人 %s 未找到!", fsTagUpdateQueue.getQwExternalContactId()));
+            }
+            qwEditUserTagParam.setUserid(qwExternalContact.getUserId());
+            qwEditUserTagParam.setExternal_userid(qwExternalContact.getExternalUserId());
+            rateLimiter.acquire();
+            qwEditUserTagParam.setAdd_tag(Collections.singletonList(fsTagUpdateQueue.getTagId()));
+            if(ObjectUtil.equal(fsTagUpdateQueue.getLogType(),1)){
+                FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(fsTagUpdateQueue.getVideoId());
+                if(fsUserCourseVideo != null && StringUtils.isNotEmpty(fsUserCourseVideo.getWatchingTagId())){
+                    qwEditUserTagParam.setRemove_tag(Collections.singletonList(fsUserCourseVideo.getWatchingTagId()));
+                }
+            }
+            QwResult qwResult = qwApiService.editUserTag(qwEditUserTagParam, fsTagUpdateQueue.getCorpId());
+            fsTagUpdateQueue.setPayload(JSON.toJSONString(qwEditUserTagParam));
+            fsTagUpdateQueue.setResponse(JSON.toJSONString(qwResult));
+            if(qwResult == null) {
+                throw new RuntimeException("企微API返回结果为null");
+            }
+            if(ObjectUtil.equal(qwResult.getErrcode(),0)) {
+                fsTagUpdateQueue.setStatus(2);
+                fsTagUpdateQueue.setRetryCount(0);
+                log.info("打标签成功 - 外部联系人:{}, 标签ID:{}, 类型:{}", 
+                    fsTagUpdateQueue.getQwExternalContactId(), 
+                    fsTagUpdateQueue.getTagId(),
+                    fsTagUpdateQueue.getLogType() == 0 ? "看课中" : "完课");
+            } else {
+                String errorMsg = String.format("打标签失败 errcode=%s, errmsg=%s", 
+                    qwResult.getErrcode(), qwResult.getErrmsg());
+                log.error("{},请求参数:{}", errorMsg, JSON.toJSONString(qwEditUserTagParam));
+                throw new RuntimeException(errorMsg);
+            }
+        } catch (Exception e){
+            fsTagUpdateQueue.setStatus(3);
+            fsTagUpdateQueue.setRetryCount(fsTagUpdateQueue.getRetryCount()+1);
+            fsTagUpdateQueue.setFailMsg(ExceptionUtils.getStackTrace(e));
+            fsTagUpdateQueue.setNextExecuteTime(LocalDateTime.now().plusHours(1));
+            log.error("处理打标签任务失败 - 任务ID:{}, 错误:{}", fsTagUpdateQueue.getId(), e.getMessage(), e);
+        }
+    }
+}

+ 8 - 0
fs-service/src/main/resources/application-druid-bjczwh.yml

@@ -156,3 +156,11 @@ isNewWxMerchant: false
 im:
     type: NONE
 
+qw:
+    enableAutoTag: 1
+tag:
+    thread:
+        num: 5
+    rate:
+        limit: 30
+

+ 2 - 1
fs-user-app/src/main/resources/application.yml

@@ -13,4 +13,5 @@ spring:
 #    active: druid-sxjz
 #    active: druid-qdtst
 #    active: druid-yzt
-    active: dev-jnlzjk
+#    active: druid-bjczwh-test
+    active: druid-bjczwh-test