|
|
@@ -20,6 +20,8 @@ import com.fs.course.domain.*;
|
|
|
import com.fs.course.mapper.*;
|
|
|
import com.fs.course.service.IFsCourseLinkService;
|
|
|
import com.fs.course.service.IFsUserCompanyBindService;
|
|
|
+import com.fs.live.domain.LiveWatchLog;
|
|
|
+import com.fs.live.mapper.LiveWatchLogMapper;
|
|
|
import com.fs.qw.domain.*;
|
|
|
import com.fs.qw.mapper.QwExternalContactMapper;
|
|
|
import com.fs.qw.mapper.QwUserMapper;
|
|
|
@@ -148,12 +150,14 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
|
+ private final BlockingQueue<LiveWatchLog> zmLiveWatchQueue = new LinkedBlockingQueue<>(20000);
|
|
|
|
|
|
// Executors for consumer threads
|
|
|
private ExecutorService qwSopLogsExecutor;
|
|
|
private ExecutorService watchLogsExecutor;
|
|
|
private ExecutorService courseLinkExecutor;
|
|
|
private ExecutorService courseSopAppLinkExecutor;
|
|
|
+ private ExecutorService zmLiveWatchLogExecutor;
|
|
|
@Autowired
|
|
|
private IQwGroupChatService qwGroupChatService;
|
|
|
@Autowired
|
|
|
@@ -184,6 +188,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
@Autowired
|
|
|
private IQwSopTempVoiceService sopTempVoiceService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ LiveWatchLogMapper liveWatchLogMapper;
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
loadCourseConfig();
|
|
|
@@ -230,11 +237,17 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
return t;
|
|
|
});
|
|
|
|
|
|
+ zmLiveWatchLogExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
|
+ Thread t = new Thread(r, "zmLiveWatchLogConsumer");
|
|
|
+ t.setDaemon(true);
|
|
|
+ return t;
|
|
|
+ });
|
|
|
|
|
|
qwSopLogsExecutor.submit(this::consumeQwSopLogs);
|
|
|
watchLogsExecutor.submit(this::consumeWatchLogs);
|
|
|
courseLinkExecutor.submit(this::consumeCourseLink);
|
|
|
courseSopAppLinkExecutor.submit(this::consumeCourseSopAppLink);
|
|
|
+ zmLiveWatchLogExecutor.submit(this::consumeZmLiveWatchQueue);
|
|
|
}
|
|
|
|
|
|
// Scheduled tasks to refresh configurations and domain names periodically
|
|
|
@@ -265,6 +278,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
watchLogsExecutor.shutdown();
|
|
|
courseLinkExecutor.shutdown();
|
|
|
courseSopAppLinkExecutor.shutdown();
|
|
|
+ zmLiveWatchLogExecutor.shutdown();
|
|
|
try {
|
|
|
if (!qwSopLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
@@ -278,11 +292,15 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
if (!courseSopAppLinkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
courseSopAppLinkExecutor.shutdownNow();
|
|
|
}
|
|
|
+ if (!zmLiveWatchLogExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
+ zmLiveWatchLogExecutor.shutdownNow();
|
|
|
+ }
|
|
|
} catch (InterruptedException e) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
watchLogsExecutor.shutdownNow();
|
|
|
courseLinkExecutor.shutdownNow();
|
|
|
courseSopAppLinkExecutor.shutdownNow();
|
|
|
+ zmLiveWatchLogExecutor.shutdownNow();
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
@@ -873,7 +891,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
Integer grade, Integer sendMsgType ,List<Company> companies ) {
|
|
|
switch (type) {
|
|
|
case 1:
|
|
|
- handleNormalMessage(sopLogs, content,companyUserId);
|
|
|
+ handleNormalMessage(sopLogs, content,companyUserId,companyId,isGroupChat,qwUserId,groupChat,externalId,logVo);
|
|
|
break;
|
|
|
case 2:
|
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
@@ -902,9 +920,73 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
enqueueQwSopLogs(sopLogs);
|
|
|
}
|
|
|
|
|
|
- private void handleNormalMessage(QwSopLogs sopLogs, QwSopTempSetting.Content content,String companyUserId) {
|
|
|
+ private void handleNormalMessage(QwSopLogs sopLogs, QwSopTempSetting.Content content, String companyUserId, String companyId,
|
|
|
+ boolean isGroupChat,String qwUserId,QwGroupChat groupChat,String externalId,SopUserLogsVo logVo) {
|
|
|
|
|
|
- sopLogs.setContentJson(JSON.toJSONString(content));
|
|
|
+ // 深拷贝 Content 对象,避免使用 JSON
|
|
|
+ QwSopTempSetting.Content clonedContent = deepCopyContent(content);
|
|
|
+ if (clonedContent == null) {
|
|
|
+ log.error("Failed to clone content, skipping handleCourseMessage.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<QwSopTempSetting.Content.Setting> settings = clonedContent.getSetting();
|
|
|
+ if (settings == null || settings.isEmpty()) {
|
|
|
+ log.error("Cloned content settings are empty, skipping.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
|
+ for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
|
+ switch (setting.getContentType()) {
|
|
|
+ //直播小程序单独
|
|
|
+ case "12":
|
|
|
+ String sortLiveLink;
|
|
|
+ sortLiveLink = "/pages_course/living.html?companyId=" + companyId + "&companyUserId=" + companyUserId + "&liveId=" + setting.getLiveId() + "&corpId=" + logVo.getCorpId()+"&qwUserId=" + qwUserId;
|
|
|
+ String json = configService.selectConfigByKey("his.config");
|
|
|
+ FSSysConfig sysConfig = JSON.parseObject(json, FSSysConfig.class);
|
|
|
+ if (isGroupChat) {
|
|
|
+ try {
|
|
|
+ groupChat.getChatUserList().stream().filter(e -> e.getUserList() != null && !e.getUserList().isEmpty()).forEach(e -> {
|
|
|
+ Map<String, GroupUserExternalVo> userMap = PubFun.listToMapByGroupObject(e.getUserList(), GroupUserExternalVo::getUserId);
|
|
|
+ GroupUserExternalVo vo = userMap.get(groupChat.getOwner());
|
|
|
+ if (vo != null && vo.getId() != null) {
|
|
|
+ sopLogs.setFsUserId(vo.getFsUserId());
|
|
|
+ //写入直播待看课记录
|
|
|
+ createLiveWatchLogAndEnQueue(companyId, companyUserId, vo.getId().toString(), setting.getLiveId(), sysConfig.getAppId(), 2, qwUserId,logVo.getCorpId());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ sortLiveLink += "&chatId=" + groupChat.getChatId();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("直播小程序群聊新增报错,{}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ createLiveWatchLogAndEnQueue(companyId, companyUserId, externalId, setting.getLiveId(), sysConfig.getAppId(), 1, qwUserId,logVo.getCorpId());
|
|
|
+ sortLiveLink += "&externalId=" + externalId;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("直播小程序个人新增报错,{}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ String miniprogramLiveTitle = setting.getMiniprogramTitle();
|
|
|
+ int maxLiveLength = 17;
|
|
|
+ setting.setMiniprogramTitle(miniprogramLiveTitle.length() > maxLiveLength ? miniprogramLiveTitle.substring(0, maxLiveLength) + "..." : miniprogramLiveTitle);
|
|
|
+ setting.setMiniprogramAppid(sysConfig.getAppId());
|
|
|
+ setting.setMiniprogramPage(sortLiveLink);
|
|
|
+ setting.setContentType("4");
|
|
|
+ try {
|
|
|
+ setting.setMiniprogramPicUrl(StringUtil.strIsNullOrEmpty(setting.getMiniprogramPicUrl()) ? "https://cos.his.cdwjyyh.com/fs/20250331/ec2b4e73be8048afbd526124a655ad56.png" : setting.getMiniprogramPicUrl());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("赋值-小程序封面地址失败-" + e);
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sopLogs.setContentJson(JSON.toJSONString(clonedContent));
|
|
|
+// sopLogs.setContentJson(JSON.toJSONString(content));
|
|
|
enqueueQwSopLogs(sopLogs);
|
|
|
}
|
|
|
|
|
|
@@ -1102,14 +1184,37 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
//直播小程序单独
|
|
|
case "12":
|
|
|
String sortLiveLink;
|
|
|
- sortLiveLink = "/pages_course/living?companyId=" + companyId + "&companyUserId=" + companyUserId + "&liveId=" + setting.getLiveId();
|
|
|
+ sortLiveLink = "/pages_course/living.html?companyId=" + companyId + "&companyUserId=" + companyUserId + "&liveId=" + setting.getLiveId()+"&corpId=" +logVo.getCorpId()+"&qwUserId=" + qwUserId;
|
|
|
+ String json = configService.selectConfigByKey("his.config");
|
|
|
+ FSSysConfig sysConfig= JSON.parseObject(json,FSSysConfig.class);
|
|
|
+ if(isGroupChat){
|
|
|
+ try{
|
|
|
+ groupChat.getChatUserList().stream().filter(e -> e.getUserList() != null && !e.getUserList().isEmpty()).forEach(e -> {
|
|
|
+ Map<String, GroupUserExternalVo> userMap = PubFun.listToMapByGroupObject(e.getUserList(), GroupUserExternalVo::getUserId);
|
|
|
+ GroupUserExternalVo vo = userMap.get(groupChat.getOwner());
|
|
|
+ if (vo != null && vo.getId() != null) {
|
|
|
+ sopLogs.setFsUserId(vo.getFsUserId());
|
|
|
+ //写入直播待看课记录
|
|
|
+ createLiveWatchLogAndEnQueue(companyId,companyUserId,vo.getId().toString(), setting.getLiveId(),sysConfig.getAppId(),2,qwUserId,logVo.getCorpId());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ sortLiveLink += "&chatId=" + groupChat.getChatId();
|
|
|
+ }catch(Exception e){
|
|
|
+ log.error("直播小程序群聊新增报错,{}", e.getMessage(),e);
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ try{
|
|
|
+ createLiveWatchLogAndEnQueue(companyId,companyUserId,externalId, setting.getLiveId(),sysConfig.getAppId(),2,qwUserId,logVo.getCorpId());
|
|
|
+ sortLiveLink += "&externalId=" + externalId;
|
|
|
+ }catch(Exception e){
|
|
|
+ log.error("直播小程序个人新增报错,{}", e.getMessage(),e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
|
|
|
String miniprogramLiveTitle = setting.getMiniprogramTitle();
|
|
|
int maxLiveLength = 17;
|
|
|
setting.setMiniprogramTitle(miniprogramLiveTitle.length() > maxLiveLength ? miniprogramLiveTitle.substring(0, maxLiveLength) + "..." : miniprogramLiveTitle);
|
|
|
- String json = configService.selectConfigByKey("his.config");
|
|
|
- FSSysConfig sysConfig= JSON.parseObject(json,FSSysConfig.class);
|
|
|
setting.setMiniprogramAppid(sysConfig.getAppId());
|
|
|
setting.setMiniprogramPage(sortLiveLink);
|
|
|
setting.setContentType("4");
|
|
|
@@ -1499,6 +1604,46 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
enqueueWatchLog(watchLog);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 直播看课记录处理
|
|
|
+ * @param companyId
|
|
|
+ * @param companyUserId
|
|
|
+ * @param externalId
|
|
|
+ * @param liveId
|
|
|
+ * @param appId
|
|
|
+ * @param logSource
|
|
|
+ * @param qwUserId
|
|
|
+ * @param corpId
|
|
|
+ */
|
|
|
+ public void createLiveWatchLogAndEnQueue(String companyId,String companyUserId,String externalId,Long liveId,String appId,Integer logSource,String qwUserId,String corpId){
|
|
|
+ // 写入对应数据源的记录表
|
|
|
+ LiveWatchLog itemLiveWatchLog = new LiveWatchLog();
|
|
|
+ itemLiveWatchLog.setLiveId(liveId);
|
|
|
+ itemLiveWatchLog.setLogType(3);
|
|
|
+ itemLiveWatchLog.setSopCreateTime(new Date());
|
|
|
+ itemLiveWatchLog.setCompanyId(Long.valueOf(companyId));
|
|
|
+ itemLiveWatchLog.setCompanyUserId(Long.valueOf(companyUserId));
|
|
|
+ itemLiveWatchLog.setSendAppId(appId);
|
|
|
+ itemLiveWatchLog.setLogSource(logSource);
|
|
|
+ itemLiveWatchLog.setQwUserId(qwUserId);
|
|
|
+ itemLiveWatchLog.setExternalContactId(Long.valueOf(externalId));
|
|
|
+ itemLiveWatchLog.setCorpId(corpId);
|
|
|
+ enqueueZmLiveWatchLog(itemLiveWatchLog);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void enqueueZmLiveWatchLog(LiveWatchLog liveWatchLog) {
|
|
|
+ try {
|
|
|
+ boolean offered = zmLiveWatchQueue.offer(liveWatchLog, 5, TimeUnit.SECONDS);
|
|
|
+ if (!offered) {
|
|
|
+ log.error("LiveWatchLog 队列已满,无法添加日志: {}", JSON.toJSONString(liveWatchLog));
|
|
|
+ // 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("插入 LiveWatchLog 队列时被中断: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 时间字符串转Date时间
|
|
|
* @param dateString
|
|
|
@@ -1675,6 +1820,35 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 消费 FsCourseSopAppLink 队列并进行批量插入
|
|
|
+ */
|
|
|
+ private void consumeZmLiveWatchQueue() {
|
|
|
+ List<LiveWatchLog> batch = new ArrayList<>(BATCH_SIZE);
|
|
|
+ while (running || !zmLiveWatchQueue.isEmpty()) {
|
|
|
+ try {
|
|
|
+ LiveWatchLog livewatchLog = zmLiveWatchQueue.poll(1, TimeUnit.SECONDS);
|
|
|
+ if (livewatchLog != null) {
|
|
|
+ batch.add(livewatchLog);
|
|
|
+ }
|
|
|
+ if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && livewatchLog == null)) {
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ batchInsertLiveWatchLog(new ArrayList<>(batch));
|
|
|
+ batch.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("zmLiveWatchQueue 消费线程被中断: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理剩余的数据
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ batchInsertLiveWatchLog(batch);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
|
*/
|
|
|
@@ -1782,6 +1956,36 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 批量插入 卓美直播看课记录
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ @Retryable(
|
|
|
+ value = {Exception.class},
|
|
|
+ maxAttempts = 3,
|
|
|
+ backoff = @Backoff(delay = 2000)
|
|
|
+ )
|
|
|
+ public void batchInsertLiveWatchLog(List<LiveWatchLog> liveWatchLogToInsert) {
|
|
|
+ try {
|
|
|
+ List<LiveWatchLog> lastInsertList = new ArrayList<>();
|
|
|
+ //判断是否存在数据 liveId + his_qw_external_contact_id 唯一
|
|
|
+ for (LiveWatchLog liveWatchLog : liveWatchLogToInsert) {
|
|
|
+ //判断是否存在数据 存在的数据直接更新发送时间
|
|
|
+ if(liveWatchLogMapper.updateLiveWatchLogCondition(liveWatchLog) > 0){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ lastInsertList.add(liveWatchLog);
|
|
|
+ }
|
|
|
+ if(!lastInsertList.isEmpty()){
|
|
|
+ liveWatchLogMapper.insertLiveWatchLogBatch(lastInsertList);
|
|
|
+ }
|
|
|
+// log.info("批量插入 LiveWatchLog 完成,共插入 {} 条记录。", liveWatchLogToInsert.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("批量插入 LiveWatchLog 失败: {}", e.getMessage(), e);
|
|
|
+ // 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
@Override
|
|
|
public void updateSopLogsByCancel() {
|