|
@@ -36,6 +36,7 @@ import com.fs.qw.mapper.QwUserMapper;
|
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
|
import com.fs.qw.service.IQwGroupChatService;
|
|
import com.fs.qw.service.IQwGroupChatService;
|
|
|
import com.fs.qw.service.IQwGroupChatUserService;
|
|
import com.fs.qw.service.IQwGroupChatUserService;
|
|
|
|
|
+import com.fs.qw.service.IQwSopSmsLogsService;
|
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
|
import com.fs.qw.vo.GroupUserExternalVo;
|
|
import com.fs.qw.vo.GroupUserExternalVo;
|
|
|
import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
@@ -74,6 +75,7 @@ import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
+import static com.fs.course.utils.LinkUtil.generateRandomNumberWithLock;
|
|
|
import static com.fs.course.utils.LinkUtil.generateRandomStringWithLock;
|
|
import static com.fs.course.utils.LinkUtil.generateRandomStringWithLock;
|
|
|
|
|
|
|
|
@Service
|
|
@Service
|
|
@@ -132,6 +134,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private IQwSopLogsService qwSopLogsService;
|
|
private IQwSopLogsService qwSopLogsService;
|
|
|
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private IQwSopSmsLogsService qwSopSmsLogsService;
|
|
|
|
|
+
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private QwSopLogsMapper qwSopLogsMapper;
|
|
private QwSopLogsMapper qwSopLogsMapper;
|
|
|
|
|
|
|
@@ -177,6 +182,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
// Blocking queues with bounded capacity to implement backpressure
|
|
// Blocking queues with bounded capacity to implement backpressure
|
|
|
private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
|
|
+ private final BlockingQueue<QwSopSmsLogs> qwSopSmsLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
@@ -184,6 +190,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
// Executors for consumer threads
|
|
// Executors for consumer threads
|
|
|
private ExecutorService qwSopLogsExecutor;
|
|
private ExecutorService qwSopLogsExecutor;
|
|
|
|
|
+ private ExecutorService qwSopSmsLogsExecutor;
|
|
|
private ExecutorService watchLogsExecutor;
|
|
private ExecutorService watchLogsExecutor;
|
|
|
private ExecutorService courseLinkExecutor;
|
|
private ExecutorService courseLinkExecutor;
|
|
|
private ExecutorService courseSopAppLinkExecutor;
|
|
private ExecutorService courseSopAppLinkExecutor;
|
|
@@ -244,6 +251,13 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
t.setDaemon(true);
|
|
t.setDaemon(true);
|
|
|
return t;
|
|
return t;
|
|
|
});
|
|
});
|
|
|
|
|
+
|
|
|
|
|
+ qwSopSmsLogsExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
|
|
|
+ Thread t = new Thread(r, "QwSopSmsLogsConsumer");
|
|
|
|
|
+ t.setDaemon(true);
|
|
|
|
|
+ return t;
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
watchLogsExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
watchLogsExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
|
Thread t = new Thread(r, "WatchLogsConsumer");
|
|
Thread t = new Thread(r, "WatchLogsConsumer");
|
|
|
t.setDaemon(true);
|
|
t.setDaemon(true);
|
|
@@ -268,6 +282,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
qwSopLogsExecutor.submit(this::consumeQwSopLogs);
|
|
qwSopLogsExecutor.submit(this::consumeQwSopLogs);
|
|
|
|
|
+ qwSopSmsLogsExecutor.submit(this::consumeQwSopSmsLogs);
|
|
|
watchLogsExecutor.submit(this::consumeWatchLogs);
|
|
watchLogsExecutor.submit(this::consumeWatchLogs);
|
|
|
courseLinkExecutor.submit(this::consumeCourseLink);
|
|
courseLinkExecutor.submit(this::consumeCourseLink);
|
|
|
courseSopAppLinkExecutor.submit(this::consumeCourseSopAppLink);
|
|
courseSopAppLinkExecutor.submit(this::consumeCourseSopAppLink);
|
|
@@ -298,6 +313,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
public void shutdownConsumers() {
|
|
public void shutdownConsumers() {
|
|
|
running = false;
|
|
running = false;
|
|
|
qwSopLogsExecutor.shutdown();
|
|
qwSopLogsExecutor.shutdown();
|
|
|
|
|
+ qwSopSmsLogsExecutor.shutdown();
|
|
|
watchLogsExecutor.shutdown();
|
|
watchLogsExecutor.shutdown();
|
|
|
courseLinkExecutor.shutdown();
|
|
courseLinkExecutor.shutdown();
|
|
|
courseSopAppLinkExecutor.shutdown();
|
|
courseSopAppLinkExecutor.shutdown();
|
|
@@ -306,6 +322,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
if (!qwSopLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
if (!qwSopLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
}
|
|
}
|
|
|
|
|
+ if (!qwSopSmsLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
|
|
+ qwSopSmsLogsExecutor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
if (!watchLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
if (!watchLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
watchLogsExecutor.shutdownNow();
|
|
watchLogsExecutor.shutdownNow();
|
|
|
}
|
|
}
|
|
@@ -320,6 +339,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
|
|
+ qwSopSmsLogsExecutor.shutdownNow();
|
|
|
watchLogsExecutor.shutdownNow();
|
|
watchLogsExecutor.shutdownNow();
|
|
|
courseLinkExecutor.shutdownNow();
|
|
courseLinkExecutor.shutdownNow();
|
|
|
courseSopAppLinkExecutor.shutdownNow();
|
|
courseSopAppLinkExecutor.shutdownNow();
|
|
@@ -543,6 +563,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyUserId = String.valueOf(qwUserByRedis.getCompanyUserId()).trim();
|
|
String companyUserId = String.valueOf(qwUserByRedis.getCompanyUserId()).trim();
|
|
|
String companyId = String.valueOf(qwUserByRedis.getCompanyId()).trim();
|
|
String companyId = String.valueOf(qwUserByRedis.getCompanyId()).trim();
|
|
|
Integer sendMsgType = qwUserByRedis.getSendMsgType();
|
|
Integer sendMsgType = qwUserByRedis.getSendMsgType();
|
|
|
|
|
+ Long serverId = qwUserByRedis.getServerId();
|
|
|
|
|
|
|
|
if (StringUtil.strIsNullOrEmpty(companyUserId) || StringUtil.strIsNullOrEmpty(companyId) || "null".equals(companyUserId)) {
|
|
if (StringUtil.strIsNullOrEmpty(companyUserId) || StringUtil.strIsNullOrEmpty(companyId) || "null".equals(companyUserId)) {
|
|
|
log.error("员工未绑定销售账号或公司,跳过处理:" + qwUserId);
|
|
log.error("员工未绑定销售账号或公司,跳过处理:" + qwUserId);
|
|
@@ -668,7 +689,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
insertSopUserLogs(sopUserLogsInfos, logVo, sendTime, ruleTimeVO, content, qwUserId,
|
|
insertSopUserLogs(sopUserLogsInfos, logVo, sendTime, ruleTimeVO, content, qwUserId,
|
|
|
companyUserId, companyId, qwUserByRedis.getWelcomeText(), qwUserByRedis.getQwUserName(),
|
|
companyUserId, companyId, qwUserByRedis.getWelcomeText(), qwUserByRedis.getQwUserName(),
|
|
|
- groupChatMap, miniAppId, config, miniMap, sendMsgType, companies);
|
|
|
|
|
|
|
+ groupChatMap, miniAppId, config, miniMap, sendMsgType, companies, serverId);
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -714,7 +735,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String qwUserId, String companyUserId, String companyId, String welcomeText, String qwUserName,
|
|
String qwUserId, String companyUserId, String companyId, String welcomeText, String qwUserName,
|
|
|
Map<String, QwGroupChat> groupChatMap, String miniAppId, CourseConfig config,
|
|
Map<String, QwGroupChat> groupChatMap, String miniAppId, CourseConfig config,
|
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap, Integer sendMsgType,
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap, Integer sendMsgType,
|
|
|
- List<Company> companies) {
|
|
|
|
|
|
|
+ List<Company> companies,Long serverId) {
|
|
|
String formattedSendTime = sendTime.toInstant()
|
|
String formattedSendTime = sendTime.toInstant()
|
|
|
.atZone(ZoneId.systemDefault())
|
|
.atZone(ZoneId.systemDefault())
|
|
|
.format(DATE_TIME_FORMATTER);
|
|
.format(DATE_TIME_FORMATTER);
|
|
@@ -787,7 +808,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
ruleTimeVO.setType(2);
|
|
ruleTimeVO.setType(2);
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, groupChat.getChatId(), welcomeText, qwUserName,
|
|
type, qwUserId, companyUserId, companyId, groupChat.getChatId(), welcomeText, qwUserName,
|
|
|
- null, true, miniAppId, groupChat, config, miniMap, null, sendMsgType, companies, liveId);
|
|
|
|
|
|
|
+ null, true, miniAppId, groupChat, config, miniMap, null, sendMsgType,
|
|
|
|
|
+ companies, liveId,serverId);
|
|
|
}
|
|
}
|
|
|
// if (content.getIndex() == 0) {
|
|
// if (content.getIndex() == 0) {
|
|
|
// QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, groupChat.getChatId(), groupChat.getName(), null, isOfficial, null);
|
|
// QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, groupChat.getChatId(), groupChat.getName(), null, isOfficial, null);
|
|
@@ -817,7 +839,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, contactId.getExternalContactId(), externalUserName, fsUserId, isOfficial, contactId.getExternalId(), contactId.getIsDaysNotStudy());
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, contactId.getExternalContactId(), externalUserName, fsUserId, isOfficial, contactId.getExternalId(), contactId.getIsDaysNotStudy());
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId, false, miniAppId,
|
|
type, qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId, false, miniAppId,
|
|
|
- null, config, miniMap, grade, sendMsgType, companies, liveId);
|
|
|
|
|
|
|
+ null, config, miniMap, grade, sendMsgType, companies, liveId,serverId);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("处理 externalContactId {} 时发生异常: {}", contactId, e.getMessage(), e);
|
|
log.error("处理 externalContactId {} 时发生异常: {}", contactId, e.getMessage(), e);
|
|
|
}
|
|
}
|
|
@@ -929,9 +951,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
SopUserLogsVo logVo, Date sendTime, Long courseId, Long videoId, int type, String qwUserId,
|
|
SopUserLogsVo logVo, Date sendTime, Long courseId, Long videoId, int type, String qwUserId,
|
|
|
String companyUserId, String companyId, String externalId, String welcomeText,
|
|
String companyUserId, String companyId, String externalId, String welcomeText,
|
|
|
String qwUserName, Long fsUserId, boolean isGroupChat, String miniAppId,
|
|
String qwUserName, Long fsUserId, boolean isGroupChat, String miniAppId,
|
|
|
- QwGroupChat groupChat, CourseConfig config,
|
|
|
|
|
- Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
|
|
|
|
|
- Integer grade, Integer sendMsgType, List<Company> companies, Long liveId) {
|
|
|
|
|
|
|
+ QwGroupChat groupChat, CourseConfig config, Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
|
|
|
|
|
+ Integer grade, Integer sendMsgType, List<Company> companies, Long liveId,Long serverId) {
|
|
|
switch (type) {
|
|
switch (type) {
|
|
|
case 1:
|
|
case 1:
|
|
|
handleNormalMessage(sopLogs, content, companyUserId, companyId, isGroupChat, qwUserId, groupChat, externalId, logVo,sendTime);
|
|
handleNormalMessage(sopLogs, content, companyUserId, companyId, isGroupChat, qwUserId, groupChat, externalId, logVo,sendTime);
|
|
@@ -939,7 +960,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
case 2:
|
|
case 2:
|
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId,
|
|
qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId,
|
|
|
- isGroupChat, miniAppId, groupChat, config, miniMap, grade, sendMsgType, companies);
|
|
|
|
|
|
|
+ isGroupChat, miniAppId, groupChat, config, miniMap, grade, sendMsgType, companies,serverId);
|
|
|
break;
|
|
break;
|
|
|
case 3:
|
|
case 3:
|
|
|
handleOrderMessage(sopLogs, content);
|
|
handleOrderMessage(sopLogs, content);
|
|
@@ -1603,7 +1624,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyId, String externalId, String welcomeText, String qwUserName,
|
|
String companyId, String externalId, String welcomeText, String qwUserName,
|
|
|
Long fsUserId, boolean isGroupChat, String miniAppId, QwGroupChat groupChat, CourseConfig config, Map<Long,
|
|
Long fsUserId, boolean isGroupChat, String miniAppId, QwGroupChat groupChat, CourseConfig config, Map<Long,
|
|
|
Map<Integer, List<CompanyMiniapp>>> miniMap, Integer grade, Integer sendMsgType,
|
|
Map<Integer, List<CompanyMiniapp>>> miniMap, Integer grade, Integer sendMsgType,
|
|
|
- List<Company> companies) {
|
|
|
|
|
|
|
+ List<Company> companies,Long serverId) {
|
|
|
QwExternalContact contact = null;
|
|
QwExternalContact contact = null;
|
|
|
if (logVo.getExternalId() != null) {
|
|
if (logVo.getExternalId() != null) {
|
|
|
contact = qwExternalContactMapper.selectById(logVo.getExternalId());
|
|
contact = qwExternalContactMapper.selectById(logVo.getExternalId());
|
|
@@ -1615,11 +1636,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-//
|
|
|
|
|
-// Integer courseType = clonedContent.getCourseType();
|
|
|
|
|
|
|
|
|
|
String isOfficial = clonedContent.getIsOfficial();
|
|
String isOfficial = clonedContent.getIsOfficial();
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ Long msgNum = Long.valueOf(generateRandomNumberWithLock());
|
|
|
|
|
+ sopLogs.setSmsLogsId(msgNum);
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ AtomicInteger index = new AtomicInteger(0);
|
|
|
|
|
+
|
|
|
List<QwSopTempSetting.Content.Setting> settings = clonedContent.getSetting();
|
|
List<QwSopTempSetting.Content.Setting> settings = clonedContent.getSetting();
|
|
|
if (settings == null || settings.isEmpty()) {
|
|
if (settings == null || settings.isEmpty()) {
|
|
|
// log.error("Cloned content settings are empty, skipping.");
|
|
// log.error("Cloned content settings are empty, skipping.");
|
|
@@ -1633,6 +1659,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
// 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
// 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
|
for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
|
|
|
+
|
|
|
|
|
+ Integer currentIndex = index.getAndIncrement();
|
|
|
|
|
+
|
|
|
switch (setting.getContentType()) {
|
|
switch (setting.getContentType()) {
|
|
|
//文字和短链一起
|
|
//文字和短链一起
|
|
|
case "1":
|
|
case "1":
|
|
@@ -1902,6 +1931,26 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
} else {
|
|
} else {
|
|
|
log.error("生成看课短链失败,跳过设置 URL。");
|
|
log.error("生成看课短链失败,跳过设置 URL。");
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ QwSopSmsLogs sopSmsLogs=new QwSopSmsLogs();
|
|
|
|
|
+ sopSmsLogs.setSopId(sopLogs.getSopId());
|
|
|
|
|
+ sopSmsLogs.setQwUserId(Long.valueOf(qwUserId));
|
|
|
|
|
+ sopSmsLogs.setSopLogId(msgNum);
|
|
|
|
|
+ sopSmsLogs.setCompanyId(companyId != null ? Long.valueOf(companyId) : null);
|
|
|
|
|
+ sopSmsLogs.setCompanyUserId(companyUserId != null ? Long.valueOf(companyUserId) : null);
|
|
|
|
|
+ sopSmsLogs.setContactId(externalId != null ? Long.valueOf(externalId) : null);
|
|
|
|
|
+ sopSmsLogs.setServerId(serverId);
|
|
|
|
|
+ sopSmsLogs.setStatus(0);
|
|
|
|
|
+ sopSmsLogs.setSendTime(new Date());
|
|
|
|
|
+ sopSmsLogs.setUpdateTime(new Date());
|
|
|
|
|
+ sopSmsLogs.setCreateTime(new Date());
|
|
|
|
|
+
|
|
|
|
|
+ sopSmsLogs.setFsUserId(sopLogs.getFsUserId());
|
|
|
|
|
+ sopSmsLogs.setSmsIndex(currentIndex);
|
|
|
|
|
+ sopSmsLogs.setContent(setting.getValue());
|
|
|
|
|
+ sopSmsLogs.setSmsTemplateCode(setting.getSmsTemplateCode());
|
|
|
|
|
+
|
|
|
|
|
+ enqueueQwSopSmsLogs(sopSmsLogs);
|
|
|
}
|
|
}
|
|
|
break;
|
|
break;
|
|
|
default:
|
|
default:
|
|
@@ -2743,6 +2792,22 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 将 QwSopSmsLogs 放入队列
|
|
|
|
|
+ */
|
|
|
|
|
+ private void enqueueQwSopSmsLogs(QwSopSmsLogs smsLogs) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ boolean offered = qwSopSmsLogsQueue.offer(smsLogs, 5, TimeUnit.SECONDS);
|
|
|
|
|
+ if (!offered) {
|
|
|
|
|
+ log.error("QwSopSmsLogs 队列已满,无法添加日志: {}", JSON.toJSONString(smsLogs));
|
|
|
|
|
+ // 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("插入 QwSopLogs 队列时被中断: {}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 将 FsCourseWatchLog 放入队列
|
|
* 将 FsCourseWatchLog 放入队列
|
|
|
*/
|
|
*/
|
|
@@ -2804,6 +2869,38 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消费 QwSopLogs 队列并进行批量插入
|
|
|
|
|
+ */
|
|
|
|
|
+ private void consumeQwSopSmsLogs() {
|
|
|
|
|
+ List<QwSopSmsLogs> batch = new ArrayList<>(BATCH_SIZE);
|
|
|
|
|
+ while (running || !qwSopSmsLogsQueue.isEmpty()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ QwSopSmsLogs log = qwSopSmsLogsQueue.poll(1, TimeUnit.SECONDS);
|
|
|
|
|
+ if (log != null) {
|
|
|
|
|
+ batch.add(log);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && log == null)) {
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ batchInsertQwSopSmsLogs(new ArrayList<>(batch));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("QwSopLogs 消费线程被中断: {}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理剩余的数据
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ batchInsertQwSopSmsLogs(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
|
*/
|
|
*/
|
|
@@ -2939,6 +3036,25 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量插入 QwSopSmsLogs
|
|
|
|
|
+ */
|
|
|
|
|
+ @Transactional
|
|
|
|
|
+ @Retryable(
|
|
|
|
|
+ value = {Exception.class},
|
|
|
|
|
+ maxAttempts = 3,
|
|
|
|
|
+ backoff = @Backoff(delay = 2000)
|
|
|
|
|
+ )
|
|
|
|
|
+ public void batchInsertQwSopSmsLogs(List<QwSopSmsLogs> logsToInsert) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ qwSopSmsLogsService.batchInsertQwSopSmsLogsOneTouch(logsToInsert);
|
|
|
|
|
+// log.info("批量插入 QwSopSmsLogs 完成,共插入 {} 条记录。", logsToInsert.size());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批量插入 QwSopSmsLogs 失败: {}", e.getMessage(), e);
|
|
|
|
|
+ // 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 批量插入 FsCourseWatchLog
|
|
* 批量插入 FsCourseWatchLog
|
|
|
*/
|
|
*/
|