|
|
@@ -5,12 +5,26 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
import com.fs.course.domain.FsCourseWatchLog;
|
|
|
+import com.fs.course.domain.FsUserCourse;
|
|
|
import com.fs.course.domain.FsUserCourseVideo;
|
|
|
+import com.fs.course.mapper.FsUserCourseMapper;
|
|
|
import com.fs.course.mapper.FsUserCourseVideoMapper;
|
|
|
import com.fs.qw.cache.IQwUserCacheService;
|
|
|
import com.fs.qw.domain.QwExternalContact;
|
|
|
+import com.fs.qw.domain.QwTag;
|
|
|
+import com.fs.qw.domain.QwTagGroup;
|
|
|
import com.fs.qw.mapper.QwExternalContactMapper;
|
|
|
+import com.fs.qw.mapper.QwTagGroupMapper;
|
|
|
+import com.fs.qw.mapper.QwTagMapper;
|
|
|
+import com.fs.qw.mapper.QwUserMapper;
|
|
|
+import com.fs.qw.service.IQwTagGroupService;
|
|
|
+import com.fs.qw.vo.QwTagGroupAddParam;
|
|
|
+import com.fs.qw.vo.QwTagVO;
|
|
|
+import com.fs.qwApi.domain.QwAddTagResult;
|
|
|
import com.fs.qwApi.domain.QwResult;
|
|
|
+import com.fs.qwApi.domain.inner.InTag;
|
|
|
+import com.fs.qwApi.domain.inner.TagData;
|
|
|
+import com.fs.qwApi.param.QwAddTagParam;
|
|
|
import com.fs.qwApi.param.QwEditUserTagParam;
|
|
|
import com.fs.qwApi.service.QwApiService;
|
|
|
import com.fs.tag.domain.FsTagUpdateQueue;
|
|
|
@@ -19,7 +33,9 @@ import com.fs.tag.service.FsTagUpdateService;
|
|
|
import com.google.common.util.concurrent.RateLimiter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
-import org.apache.commons.lang3.exception.ExceptionUtils;
|
|
|
+import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
+import org.apache.http.util.Asserts;
|
|
|
+import org.checkerframework.checker.signature.qual.PolySignature;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
@@ -28,6 +44,8 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import java.time.LocalDateTime;
|
|
|
+import java.time.temporal.ChronoUnit;
|
|
|
+import java.time.temporal.TemporalUnit;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
@@ -36,6 +54,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
@Service("fsTagUpdateService")
|
|
|
public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
|
|
|
+
|
|
|
@Autowired
|
|
|
private FsTagUpdateQueueMapper fsTagUpdateQueueMapper;
|
|
|
@Autowired
|
|
|
@@ -44,6 +63,19 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
private IQwUserCacheService qwUserCacheService;
|
|
|
@Autowired
|
|
|
private QwApiService qwApiService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QwTagMapper qwTagMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QwTagGroupMapper qwTagGroupMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IQwTagGroupService qwTagGroupService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private FsUserCourseMapper fsUserCourseMapper;
|
|
|
+
|
|
|
@Autowired
|
|
|
private QwExternalContactMapper qwExternalContactMapper;
|
|
|
|
|
|
@@ -52,9 +84,19 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
|
|
|
@Value("${tag.rate.limit:30}")
|
|
|
private Integer RATE_LIMIT_NUM;
|
|
|
+ /**
|
|
|
+ * 标签组最大数量
|
|
|
+ */
|
|
|
+ private static final Integer TAG_MAX_NUM = 100;
|
|
|
|
|
|
+ /**
|
|
|
+ * 接口限流
|
|
|
+ */
|
|
|
private RateLimiter rateLimiter;
|
|
|
|
|
|
+ /**
|
|
|
+ * 看课自动打标签开关
|
|
|
+ */
|
|
|
@Value("${qw.enableAutoTag:0}")
|
|
|
private Integer enableAutoTag;
|
|
|
|
|
|
@@ -66,30 +108,35 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
|
|
|
public void onCourseWatchingBatch(List<FsCourseWatchLog> logs) {
|
|
|
+
|
|
|
if(ObjectUtil.equal(enableAutoTag,0)){
|
|
|
return;
|
|
|
}
|
|
|
+ Map<Long, FsUserCourseVideo> courseVideoMap = fsUserCourseVideoMapper.selectAllMap();
|
|
|
+ // 用户(这里用户用的是企微外部联系人ID)+videoId+status 唯一
|
|
|
+
|
|
|
+ // 先导课看课记录
|
|
|
List<FsTagUpdateQueue> batchData = new ArrayList<>();
|
|
|
for (FsCourseWatchLog item : logs) {
|
|
|
FsTagUpdateQueue task = new FsTagUpdateQueue();
|
|
|
task.setCourseId(item.getCourseId());
|
|
|
task.setVideoId(item.getVideoId());
|
|
|
task.setCourseLogId(item.getLogId());
|
|
|
+ task.setTagId(null);
|
|
|
+ task.setTagName(null);
|
|
|
+
|
|
|
task.setLogType(0);
|
|
|
task.setOperationType(0);
|
|
|
task.setStatus(0);
|
|
|
task.setRetryCount(0);
|
|
|
- task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
|
|
|
+ task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
|
|
|
task.setQwUserId(String.valueOf(item.getQwUserId()));
|
|
|
-
|
|
|
- // 通过qwExternalContactId获取corpId
|
|
|
- QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(item.getQwExternalContactId());
|
|
|
- if(qwExternalContact != null && StringUtils.isNotNull(qwExternalContact.getCorpId())){
|
|
|
- task.setCorpId(qwExternalContact.getCorpId());
|
|
|
+
|
|
|
+ FsUserCourseVideo fsUserCourseVideo = courseVideoMap.get(task.getVideoId());
|
|
|
+ String corpId = qwUserCacheService.queryCorpIdByQwUserId(item.getQwUserId());
|
|
|
+ if(StringUtils.isNotNull(corpId)){
|
|
|
+ task.setCorpId(corpId);
|
|
|
}
|
|
|
-
|
|
|
- // 通过videoId查询视频信息
|
|
|
- FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(task.getVideoId());
|
|
|
if(ObjectUtils.isNull(fsUserCourseVideo)) {
|
|
|
String errorMsg = String.format("该条记录 %d 找不到对应的课堂视频", task.getVideoId());
|
|
|
log.error(errorMsg);
|
|
|
@@ -99,46 +146,55 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
batchData.add(task);
|
|
|
continue;
|
|
|
}
|
|
|
- task.setTagId(fsUserCourseVideo.getWatchingTagId());
|
|
|
+ task.setTagGroupId(fsUserCourseVideo.getTagGroupId());
|
|
|
+ task.setTgId(fsUserCourseVideo.getTgId());
|
|
|
+ task.setWatchingTagId(fsUserCourseVideo.getWatchingTagId());
|
|
|
+ task.setWatchedTagId(fsUserCourseVideo.getWatchedTagId());
|
|
|
+ task.setWatchingTgId(fsUserCourseVideo.getWatchingTgId());
|
|
|
+ task.setWatchedTgId(fsUserCourseVideo.getWatchedTgId());
|
|
|
+
|
|
|
if(ObjectUtil.equal(fsUserCourseVideo.getIsFirst(),1)) {
|
|
|
task.setIsFirst(1);
|
|
|
} else {
|
|
|
task.setIsFirst(0);
|
|
|
}
|
|
|
+ task.setSort(fsUserCourseVideo.getCourseSort());
|
|
|
batchData.add(task);
|
|
|
}
|
|
|
- if(CollectionUtils.isNotEmpty(batchData)){
|
|
|
- fsTagUpdateQueueMapper.batchInsert(batchData);
|
|
|
- }
|
|
|
+
|
|
|
+
|
|
|
+ fsTagUpdateQueueMapper.batchInsert(batchData);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
|
|
|
public void onCourseWatchFinishedBatch(List<FsCourseWatchLog> logs) {
|
|
|
if(ObjectUtil.equal(enableAutoTag,0)){
|
|
|
return;
|
|
|
}
|
|
|
+ Map<Long, FsUserCourseVideo> courseVideoMap = fsUserCourseVideoMapper.selectAllMap();
|
|
|
+
|
|
|
+ // 先导课看课记录
|
|
|
List<FsTagUpdateQueue> batchData = new ArrayList<>();
|
|
|
for (FsCourseWatchLog item : logs) {
|
|
|
FsTagUpdateQueue task = new FsTagUpdateQueue();
|
|
|
task.setCourseId(item.getCourseId());
|
|
|
task.setVideoId(item.getVideoId());
|
|
|
task.setCourseLogId(item.getLogId());
|
|
|
- task.setLogType(1);
|
|
|
+ task.setTagId(null);
|
|
|
+ task.setTagName(null);
|
|
|
task.setOperationType(0);
|
|
|
task.setStatus(0);
|
|
|
task.setRetryCount(0);
|
|
|
- task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
|
|
|
+ task.setQwExternalContactId(String.valueOf(item.getQwExternalContactId()));
|
|
|
task.setQwUserId(String.valueOf(item.getQwUserId()));
|
|
|
-
|
|
|
- // 通过qwExternalContactId获取corpId
|
|
|
- QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(item.getQwExternalContactId());
|
|
|
- if(qwExternalContact != null && StringUtils.isNotNull(qwExternalContact.getCorpId())){
|
|
|
- task.setCorpId(qwExternalContact.getCorpId());
|
|
|
+ task.setLogType(1);
|
|
|
+ String corpId = qwUserCacheService.queryCorpIdByQwUserId(item.getQwUserId());
|
|
|
+ if(StringUtils.isNotNull(corpId)){
|
|
|
+ task.setCorpId(corpId);
|
|
|
}
|
|
|
-
|
|
|
- // 通过videoId查询视频信息
|
|
|
- FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(task.getVideoId());
|
|
|
+ FsUserCourseVideo fsUserCourseVideo = courseVideoMap.get(task.getVideoId());
|
|
|
if(ObjectUtils.isNull(fsUserCourseVideo)) {
|
|
|
String errorMsg = String.format("该条记录 %d 找不到对应的课堂视频", task.getVideoId());
|
|
|
log.error(errorMsg);
|
|
|
@@ -148,17 +204,24 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
batchData.add(task);
|
|
|
continue;
|
|
|
}
|
|
|
- task.setTagId(fsUserCourseVideo.getWatchedTagId());
|
|
|
+ task.setTagGroupId(fsUserCourseVideo.getTagGroupId());
|
|
|
+ task.setTgId(fsUserCourseVideo.getTgId());
|
|
|
+ task.setWatchingTagId(fsUserCourseVideo.getWatchingTagId());
|
|
|
+ task.setWatchedTagId(fsUserCourseVideo.getWatchedTagId());
|
|
|
+ task.setWatchingTgId(fsUserCourseVideo.getWatchingTgId());
|
|
|
+ task.setWatchedTgId(fsUserCourseVideo.getWatchedTgId());
|
|
|
+
|
|
|
if(ObjectUtil.equal(fsUserCourseVideo.getIsFirst(),1)) {
|
|
|
task.setIsFirst(1);
|
|
|
} else {
|
|
|
task.setIsFirst(0);
|
|
|
}
|
|
|
+ task.setSort(fsUserCourseVideo.getCourseSort());
|
|
|
batchData.add(task);
|
|
|
}
|
|
|
- if(CollectionUtils.isNotEmpty(batchData)){
|
|
|
- fsTagUpdateQueueMapper.batchInsert(batchData);
|
|
|
- }
|
|
|
+
|
|
|
+
|
|
|
+ fsTagUpdateQueueMapper.batchInsert(batchData);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -171,6 +234,7 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
|
|
|
ExecutorService executor = Executors.newFixedThreadPool(TAG_THREAD_NUM);
|
|
|
CountDownLatch latch = new CountDownLatch(tasks.size());
|
|
|
+
|
|
|
for (FsTagUpdateQueue task : tasks) {
|
|
|
executor.submit(() -> {
|
|
|
String lockKey = task.getCourseId() + "_" + task.getVideoId();
|
|
|
@@ -193,6 +257,7 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
executor.shutdown();
|
|
|
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
|
|
|
executor.shutdownNow();
|
|
|
+ // 再次等待确保已经关闭
|
|
|
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
|
|
|
log.warn("线程池未能在预期时间内关闭");
|
|
|
}
|
|
|
@@ -201,57 +266,53 @@ public class FsTagUpdateServiceImpl implements FsTagUpdateService {
|
|
|
executor.shutdownNow();
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
+
|
|
|
if(CollectionUtils.isNotEmpty(tasks)){
|
|
|
fsTagUpdateQueueMapper.batchUpdateSelective(tasks);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private void processSingleTask(FsTagUpdateQueue fsTagUpdateQueue) {
|
|
|
try {
|
|
|
- if(StringUtils.isEmpty(fsTagUpdateQueue.getTagId())){
|
|
|
- throw new IllegalArgumentException("标签ID为空,请先在视频配置中设置标签");
|
|
|
- }
|
|
|
+ // 调用企微API更新标签
|
|
|
QwEditUserTagParam qwEditUserTagParam = new QwEditUserTagParam();
|
|
|
- Long externalContactId = Long.parseLong(fsTagUpdateQueue.getQwExternalContactId());
|
|
|
- QwExternalContact qwExternalContact = qwExternalContactMapper.selectQwExternalContactById(externalContactId);
|
|
|
+ QwExternalContact qwExternalContact = qwExternalContactMapper
|
|
|
+ .selectQwExternalContactById(Long.valueOf(fsTagUpdateQueue.getQwExternalContactId()));
|
|
|
if(qwExternalContact == null) {
|
|
|
throw new IllegalArgumentException(String.format("企微外部联系人 %s 未找到!", fsTagUpdateQueue.getQwExternalContactId()));
|
|
|
}
|
|
|
qwEditUserTagParam.setUserid(qwExternalContact.getUserId());
|
|
|
qwEditUserTagParam.setExternal_userid(qwExternalContact.getExternalUserId());
|
|
|
+
|
|
|
rateLimiter.acquire();
|
|
|
- qwEditUserTagParam.setAdd_tag(Collections.singletonList(fsTagUpdateQueue.getTagId()));
|
|
|
- if(ObjectUtil.equal(fsTagUpdateQueue.getLogType(),1)){
|
|
|
- FsUserCourseVideo fsUserCourseVideo = fsUserCourseVideoMapper.selectFsUserCourseVideoByVideoId(fsTagUpdateQueue.getVideoId());
|
|
|
- if(fsUserCourseVideo != null && StringUtils.isNotEmpty(fsUserCourseVideo.getWatchingTagId())){
|
|
|
- qwEditUserTagParam.setRemove_tag(Collections.singletonList(fsUserCourseVideo.getWatchingTagId()));
|
|
|
- }
|
|
|
+
|
|
|
+ // 如果是看课中
|
|
|
+ if(ObjectUtil.equal(fsTagUpdateQueue.getLogType(),0)){
|
|
|
+ qwEditUserTagParam.setAdd_tag(Collections.singletonList(fsTagUpdateQueue.getWatchingTagId()));
|
|
|
+ } else {
|
|
|
+ // 已完课
|
|
|
+ qwEditUserTagParam.setAdd_tag(Collections.singletonList(fsTagUpdateQueue.getWatchedTagId()));
|
|
|
+ qwEditUserTagParam.setRemove_tag(Collections.singletonList(fsTagUpdateQueue.getWatchingTagId()));
|
|
|
}
|
|
|
+
|
|
|
QwResult qwResult = qwApiService.editUserTag(qwEditUserTagParam, fsTagUpdateQueue.getCorpId());
|
|
|
fsTagUpdateQueue.setPayload(JSON.toJSONString(qwEditUserTagParam));
|
|
|
fsTagUpdateQueue.setResponse(JSON.toJSONString(qwResult));
|
|
|
- if(qwResult == null) {
|
|
|
- throw new RuntimeException("企微API返回结果为null");
|
|
|
- }
|
|
|
+ // 打标签成功
|
|
|
if(ObjectUtil.equal(qwResult.getErrcode(),0)) {
|
|
|
fsTagUpdateQueue.setStatus(2);
|
|
|
fsTagUpdateQueue.setRetryCount(0);
|
|
|
- log.info("打标签成功 - 外部联系人:{}, 标签ID:{}, 类型:{}",
|
|
|
- fsTagUpdateQueue.getQwExternalContactId(),
|
|
|
- fsTagUpdateQueue.getTagId(),
|
|
|
- fsTagUpdateQueue.getLogType() == 0 ? "看课中" : "完课");
|
|
|
} else {
|
|
|
- String errorMsg = String.format("打标签失败 errcode=%s, errmsg=%s",
|
|
|
- qwResult.getErrcode(), qwResult.getErrmsg());
|
|
|
- log.error("{},请求参数:{}", errorMsg, JSON.toJSONString(qwEditUserTagParam));
|
|
|
- throw new RuntimeException(errorMsg);
|
|
|
+ throw new RuntimeException(String.format("打标签失败 原因: %s", JSON.toJSONString(qwResult)));
|
|
|
}
|
|
|
} catch (Exception e){
|
|
|
fsTagUpdateQueue.setStatus(3);
|
|
|
fsTagUpdateQueue.setRetryCount(fsTagUpdateQueue.getRetryCount()+1);
|
|
|
- fsTagUpdateQueue.setFailMsg(ExceptionUtils.getStackTrace(e));
|
|
|
+ fsTagUpdateQueue.setFailMsg(ExceptionUtils.getFullStackTrace(e));
|
|
|
fsTagUpdateQueue.setNextExecuteTime(LocalDateTime.now().plusHours(1));
|
|
|
- log.error("处理打标签任务失败 - 任务ID:{}, 错误:{}", fsTagUpdateQueue.getId(), e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
}
|