|
@@ -36,6 +36,7 @@ import com.fs.qw.mapper.QwUserMapper;
|
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
|
import com.fs.qw.service.IQwGroupChatService;
|
|
import com.fs.qw.service.IQwGroupChatService;
|
|
|
import com.fs.qw.service.IQwGroupChatUserService;
|
|
import com.fs.qw.service.IQwGroupChatUserService;
|
|
|
|
|
+import com.fs.qw.service.IQwSopSmsLogsService;
|
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
|
import com.fs.qw.vo.GroupUserExternalVo;
|
|
import com.fs.qw.vo.GroupUserExternalVo;
|
|
|
import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
@@ -74,6 +75,7 @@ import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
+import static com.fs.course.utils.LinkUtil.generateRandomNumberWithLock;
|
|
|
import static com.fs.course.utils.LinkUtil.generateRandomStringWithLock;
|
|
import static com.fs.course.utils.LinkUtil.generateRandomStringWithLock;
|
|
|
|
|
|
|
|
@Service
|
|
@Service
|
|
@@ -132,6 +134,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private IQwSopLogsService qwSopLogsService;
|
|
private IQwSopLogsService qwSopLogsService;
|
|
|
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private IQwSopSmsLogsService qwSopSmsLogsService;
|
|
|
|
|
+
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private QwSopLogsMapper qwSopLogsMapper;
|
|
private QwSopLogsMapper qwSopLogsMapper;
|
|
|
|
|
|
|
@@ -177,6 +182,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
// Blocking queues with bounded capacity to implement backpressure
|
|
// Blocking queues with bounded capacity to implement backpressure
|
|
|
private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
|
|
+ private final BlockingQueue<QwSopSmsLogs> qwSopSmsLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
@@ -543,6 +549,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyUserId = String.valueOf(qwUserByRedis.getCompanyUserId()).trim();
|
|
String companyUserId = String.valueOf(qwUserByRedis.getCompanyUserId()).trim();
|
|
|
String companyId = String.valueOf(qwUserByRedis.getCompanyId()).trim();
|
|
String companyId = String.valueOf(qwUserByRedis.getCompanyId()).trim();
|
|
|
Integer sendMsgType = qwUserByRedis.getSendMsgType();
|
|
Integer sendMsgType = qwUserByRedis.getSendMsgType();
|
|
|
|
|
+ Long serverId = qwUserByRedis.getServerId();
|
|
|
|
|
|
|
|
if (StringUtil.strIsNullOrEmpty(companyUserId) || StringUtil.strIsNullOrEmpty(companyId) || "null".equals(companyUserId)) {
|
|
if (StringUtil.strIsNullOrEmpty(companyUserId) || StringUtil.strIsNullOrEmpty(companyId) || "null".equals(companyUserId)) {
|
|
|
log.error("员工未绑定销售账号或公司,跳过处理:" + qwUserId);
|
|
log.error("员工未绑定销售账号或公司,跳过处理:" + qwUserId);
|
|
@@ -668,7 +675,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
insertSopUserLogs(sopUserLogsInfos, logVo, sendTime, ruleTimeVO, content, qwUserId,
|
|
insertSopUserLogs(sopUserLogsInfos, logVo, sendTime, ruleTimeVO, content, qwUserId,
|
|
|
companyUserId, companyId, qwUserByRedis.getWelcomeText(), qwUserByRedis.getQwUserName(),
|
|
companyUserId, companyId, qwUserByRedis.getWelcomeText(), qwUserByRedis.getQwUserName(),
|
|
|
- groupChatMap, miniAppId, config, miniMap, sendMsgType, companies);
|
|
|
|
|
|
|
+ groupChatMap, miniAppId, config, miniMap, sendMsgType, companies, serverId);
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -714,7 +721,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String qwUserId, String companyUserId, String companyId, String welcomeText, String qwUserName,
|
|
String qwUserId, String companyUserId, String companyId, String welcomeText, String qwUserName,
|
|
|
Map<String, QwGroupChat> groupChatMap, String miniAppId, CourseConfig config,
|
|
Map<String, QwGroupChat> groupChatMap, String miniAppId, CourseConfig config,
|
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap, Integer sendMsgType,
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap, Integer sendMsgType,
|
|
|
- List<Company> companies) {
|
|
|
|
|
|
|
+ List<Company> companies,Long serverId) {
|
|
|
String formattedSendTime = sendTime.toInstant()
|
|
String formattedSendTime = sendTime.toInstant()
|
|
|
.atZone(ZoneId.systemDefault())
|
|
.atZone(ZoneId.systemDefault())
|
|
|
.format(DATE_TIME_FORMATTER);
|
|
.format(DATE_TIME_FORMATTER);
|
|
@@ -787,7 +794,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
ruleTimeVO.setType(2);
|
|
ruleTimeVO.setType(2);
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, groupChat.getChatId(), welcomeText, qwUserName,
|
|
type, qwUserId, companyUserId, companyId, groupChat.getChatId(), welcomeText, qwUserName,
|
|
|
- null, true, miniAppId, groupChat, config, miniMap, null, sendMsgType, companies, liveId);
|
|
|
|
|
|
|
+ null, true, miniAppId, groupChat, config, miniMap, null, sendMsgType,
|
|
|
|
|
+ companies, liveId,serverId);
|
|
|
}
|
|
}
|
|
|
// if (content.getIndex() == 0) {
|
|
// if (content.getIndex() == 0) {
|
|
|
// QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, groupChat.getChatId(), groupChat.getName(), null, isOfficial, null);
|
|
// QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, groupChat.getChatId(), groupChat.getName(), null, isOfficial, null);
|
|
@@ -817,7 +825,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, contactId.getExternalContactId(), externalUserName, fsUserId, isOfficial, contactId.getExternalId(), contactId.getIsDaysNotStudy());
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, contactId.getExternalContactId(), externalUserName, fsUserId, isOfficial, contactId.getExternalId(), contactId.getIsDaysNotStudy());
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId, false, miniAppId,
|
|
type, qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId, false, miniAppId,
|
|
|
- null, config, miniMap, grade, sendMsgType, companies, liveId);
|
|
|
|
|
|
|
+ null, config, miniMap, grade, sendMsgType, companies, liveId,serverId);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("处理 externalContactId {} 时发生异常: {}", contactId, e.getMessage(), e);
|
|
log.error("处理 externalContactId {} 时发生异常: {}", contactId, e.getMessage(), e);
|
|
|
}
|
|
}
|
|
@@ -929,9 +937,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
SopUserLogsVo logVo, Date sendTime, Long courseId, Long videoId, int type, String qwUserId,
|
|
SopUserLogsVo logVo, Date sendTime, Long courseId, Long videoId, int type, String qwUserId,
|
|
|
String companyUserId, String companyId, String externalId, String welcomeText,
|
|
String companyUserId, String companyId, String externalId, String welcomeText,
|
|
|
String qwUserName, Long fsUserId, boolean isGroupChat, String miniAppId,
|
|
String qwUserName, Long fsUserId, boolean isGroupChat, String miniAppId,
|
|
|
- QwGroupChat groupChat, CourseConfig config,
|
|
|
|
|
- Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
|
|
|
|
|
- Integer grade, Integer sendMsgType, List<Company> companies, Long liveId) {
|
|
|
|
|
|
|
+ QwGroupChat groupChat, CourseConfig config, Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
|
|
|
|
|
+ Integer grade, Integer sendMsgType, List<Company> companies, Long liveId,Long serverId) {
|
|
|
switch (type) {
|
|
switch (type) {
|
|
|
case 1:
|
|
case 1:
|
|
|
handleNormalMessage(sopLogs, content, companyUserId, companyId, isGroupChat, qwUserId, groupChat, externalId, logVo,sendTime);
|
|
handleNormalMessage(sopLogs, content, companyUserId, companyId, isGroupChat, qwUserId, groupChat, externalId, logVo,sendTime);
|
|
@@ -939,7 +946,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
case 2:
|
|
case 2:
|
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId,
|
|
qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId,
|
|
|
- isGroupChat, miniAppId, groupChat, config, miniMap, grade, sendMsgType, companies);
|
|
|
|
|
|
|
+ isGroupChat, miniAppId, groupChat, config, miniMap, grade, sendMsgType, companies,serverId);
|
|
|
break;
|
|
break;
|
|
|
case 3:
|
|
case 3:
|
|
|
handleOrderMessage(sopLogs, content);
|
|
handleOrderMessage(sopLogs, content);
|
|
@@ -1603,7 +1610,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyId, String externalId, String welcomeText, String qwUserName,
|
|
String companyId, String externalId, String welcomeText, String qwUserName,
|
|
|
Long fsUserId, boolean isGroupChat, String miniAppId, QwGroupChat groupChat, CourseConfig config, Map<Long,
|
|
Long fsUserId, boolean isGroupChat, String miniAppId, QwGroupChat groupChat, CourseConfig config, Map<Long,
|
|
|
Map<Integer, List<CompanyMiniapp>>> miniMap, Integer grade, Integer sendMsgType,
|
|
Map<Integer, List<CompanyMiniapp>>> miniMap, Integer grade, Integer sendMsgType,
|
|
|
- List<Company> companies) {
|
|
|
|
|
|
|
+ List<Company> companies,Long serverId) {
|
|
|
QwExternalContact contact = null;
|
|
QwExternalContact contact = null;
|
|
|
if (logVo.getExternalId() != null) {
|
|
if (logVo.getExternalId() != null) {
|
|
|
contact = qwExternalContactMapper.selectById(logVo.getExternalId());
|
|
contact = qwExternalContactMapper.selectById(logVo.getExternalId());
|
|
@@ -1615,11 +1622,16 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-//
|
|
|
|
|
-// Integer courseType = clonedContent.getCourseType();
|
|
|
|
|
|
|
|
|
|
String isOfficial = clonedContent.getIsOfficial();
|
|
String isOfficial = clonedContent.getIsOfficial();
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ Long msgNum = Long.valueOf(generateRandomNumberWithLock());
|
|
|
|
|
+ sopLogs.setSmsLogsId(msgNum);
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ AtomicInteger index = new AtomicInteger(0);
|
|
|
|
|
+
|
|
|
List<QwSopTempSetting.Content.Setting> settings = clonedContent.getSetting();
|
|
List<QwSopTempSetting.Content.Setting> settings = clonedContent.getSetting();
|
|
|
if (settings == null || settings.isEmpty()) {
|
|
if (settings == null || settings.isEmpty()) {
|
|
|
// log.error("Cloned content settings are empty, skipping.");
|
|
// log.error("Cloned content settings are empty, skipping.");
|
|
@@ -1633,6 +1645,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
// 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
// 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
|
for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
|
|
|
+
|
|
|
|
|
+ Integer currentIndex = index.getAndIncrement();
|
|
|
|
|
+
|
|
|
switch (setting.getContentType()) {
|
|
switch (setting.getContentType()) {
|
|
|
//文字和短链一起
|
|
//文字和短链一起
|
|
|
case "1":
|
|
case "1":
|
|
@@ -1902,6 +1917,26 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
} else {
|
|
} else {
|
|
|
log.error("生成看课短链失败,跳过设置 URL。");
|
|
log.error("生成看课短链失败,跳过设置 URL。");
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ QwSopSmsLogs sopSmsLogs=new QwSopSmsLogs();
|
|
|
|
|
+ sopSmsLogs.setSopId(sopLogs.getSopId());
|
|
|
|
|
+ sopSmsLogs.setQwUserId(Long.valueOf(qwUserId));
|
|
|
|
|
+ sopSmsLogs.setSopLogId(msgNum);
|
|
|
|
|
+ sopSmsLogs.setCompanyId(companyId != null ? Long.valueOf(companyId) : null);
|
|
|
|
|
+ sopSmsLogs.setCompanyUserId(companyUserId != null ? Long.valueOf(companyUserId) : null);
|
|
|
|
|
+ sopSmsLogs.setContactId(externalId != null ? Long.valueOf(externalId) : null);
|
|
|
|
|
+ sopSmsLogs.setServerId(serverId);
|
|
|
|
|
+ sopSmsLogs.setStatus(0);
|
|
|
|
|
+ sopSmsLogs.setSendTime(new Date());
|
|
|
|
|
+ sopSmsLogs.setUpdateTime(new Date());
|
|
|
|
|
+ sopSmsLogs.setCreateTime(new Date());
|
|
|
|
|
+
|
|
|
|
|
+ sopSmsLogs.setFsUserId(sopLogs.getFsUserId());
|
|
|
|
|
+ sopSmsLogs.setSmsIndex(currentIndex);
|
|
|
|
|
+ sopSmsLogs.setContent(setting.getValue());
|
|
|
|
|
+ sopSmsLogs.setSmsTemplateCode(setting.getSmsTemplateCode());
|
|
|
|
|
+
|
|
|
|
|
+ enqueueQwSopSmsLogs(sopSmsLogs);
|
|
|
}
|
|
}
|
|
|
break;
|
|
break;
|
|
|
default:
|
|
default:
|
|
@@ -2743,6 +2778,22 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 将 QwSopSmsLogs 放入队列
|
|
|
|
|
+ */
|
|
|
|
|
+ private void enqueueQwSopSmsLogs(QwSopSmsLogs smsLogs) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ boolean offered = qwSopSmsLogsQueue.offer(smsLogs, 5, TimeUnit.SECONDS);
|
|
|
|
|
+ if (!offered) {
|
|
|
|
|
+ log.error("QwSopSmsLogs 队列已满,无法添加日志: {}", JSON.toJSONString(smsLogs));
|
|
|
|
|
+ // 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("插入 QwSopLogs 队列时被中断: {}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 将 FsCourseWatchLog 放入队列
|
|
* 将 FsCourseWatchLog 放入队列
|
|
|
*/
|
|
*/
|
|
@@ -2804,6 +2855,38 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消费 QwSopLogs 队列并进行批量插入
|
|
|
|
|
+ */
|
|
|
|
|
+ private void consumeQwSopSmsLogs() {
|
|
|
|
|
+ List<QwSopSmsLogs> batch = new ArrayList<>(BATCH_SIZE);
|
|
|
|
|
+ while (running || !qwSopSmsLogsQueue.isEmpty()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ QwSopSmsLogs log = qwSopSmsLogsQueue.poll(1, TimeUnit.SECONDS);
|
|
|
|
|
+ if (log != null) {
|
|
|
|
|
+ batch.add(log);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && log == null)) {
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ batchInsertQwSopSmsLogs(new ArrayList<>(batch));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("QwSopLogs 消费线程被中断: {}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理剩余的数据
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ batchInsertQwSopSmsLogs(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
|
*/
|
|
*/
|
|
@@ -2939,6 +3022,25 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量插入 QwSopSmsLogs
|
|
|
|
|
+ */
|
|
|
|
|
+ @Transactional
|
|
|
|
|
+ @Retryable(
|
|
|
|
|
+ value = {Exception.class},
|
|
|
|
|
+ maxAttempts = 3,
|
|
|
|
|
+ backoff = @Backoff(delay = 2000)
|
|
|
|
|
+ )
|
|
|
|
|
+ public void batchInsertQwSopSmsLogs(List<QwSopSmsLogs> logsToInsert) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ qwSopSmsLogsService.batchInsertQwSopSmsLogsOneTouch(logsToInsert);
|
|
|
|
|
+// log.info("批量插入 QwSopSmsLogs 完成,共插入 {} 条记录。", logsToInsert.size());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批量插入 QwSopSmsLogs 失败: {}", e.getMessage(), e);
|
|
|
|
|
+ // 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 批量插入 FsCourseWatchLog
|
|
* 批量插入 FsCourseWatchLog
|
|
|
*/
|
|
*/
|