|
|
@@ -30,6 +30,7 @@ import com.fs.qw.mapper.QwUserMapper;
|
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
|
import com.fs.qw.service.IQwGroupChatService;
|
|
|
import com.fs.qw.service.IQwGroupChatUserService;
|
|
|
+import com.fs.qw.service.IQwSopSmsLogsService;
|
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
|
import com.fs.qw.vo.GroupUserExternalVo;
|
|
|
import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
|
@@ -69,6 +70,7 @@ import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static com.fs.course.utils.LinkUtil.generateRandomNumberWithLock;
|
|
|
import static com.fs.course.utils.LinkUtil.generateRandomStringWithLock;
|
|
|
|
|
|
@Service
|
|
|
@@ -124,6 +126,10 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
@Autowired
|
|
|
private IQwSopLogsService qwSopLogsService;
|
|
|
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IQwSopSmsLogsService qwSopSmsLogsService;
|
|
|
+
|
|
|
@Autowired
|
|
|
private QwSopLogsMapper qwSopLogsMapper;
|
|
|
|
|
|
@@ -160,12 +166,14 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
// Blocking queues with bounded capacity to implement backpressure
|
|
|
private final BlockingQueue<QwSopLogs> qwSopLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
+ private final BlockingQueue<QwSopSmsLogs> qwSopSmsLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseWatchLog> watchLogsQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseLink> linkQueue = new LinkedBlockingQueue<>(20000);
|
|
|
private final BlockingQueue<FsCourseSopAppLink> sopAppLinks = new LinkedBlockingQueue<>(20000);
|
|
|
|
|
|
// Executors for consumer threads
|
|
|
private ExecutorService qwSopLogsExecutor;
|
|
|
+ private ExecutorService qwSopSmsLogsExecutor;
|
|
|
private ExecutorService watchLogsExecutor;
|
|
|
private ExecutorService courseLinkExecutor;
|
|
|
private ExecutorService courseSopAppLinkExecutor;
|
|
|
@@ -223,6 +231,12 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
t.setDaemon(true);
|
|
|
return t;
|
|
|
});
|
|
|
+
|
|
|
+ qwSopSmsLogsExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
|
+ Thread t = new Thread(r, "QwSopSmsLogsConsumer");
|
|
|
+ t.setDaemon(true);
|
|
|
+ return t;
|
|
|
+ });
|
|
|
watchLogsExecutor = Executors.newSingleThreadExecutor(r -> {
|
|
|
Thread t = new Thread(r, "WatchLogsConsumer");
|
|
|
t.setDaemon(true);
|
|
|
@@ -242,6 +256,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
|
|
|
qwSopLogsExecutor.submit(this::consumeQwSopLogs);
|
|
|
+ qwSopSmsLogsExecutor.submit(this::consumeQwSopSmsLogs);
|
|
|
watchLogsExecutor.submit(this::consumeWatchLogs);
|
|
|
courseLinkExecutor.submit(this::consumeCourseLink);
|
|
|
courseSopAppLinkExecutor.submit(this::consumeCourseSopAppLink);
|
|
|
@@ -272,6 +287,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
public void shutdownConsumers() {
|
|
|
running = false;
|
|
|
qwSopLogsExecutor.shutdown();
|
|
|
+ qwSopSmsLogsExecutor.shutdown();
|
|
|
watchLogsExecutor.shutdown();
|
|
|
courseLinkExecutor.shutdown();
|
|
|
courseSopAppLinkExecutor.shutdown();
|
|
|
@@ -279,6 +295,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
if (!qwSopLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
}
|
|
|
+ if (!qwSopSmsLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
+ qwSopSmsLogsExecutor.shutdownNow();
|
|
|
+ }
|
|
|
if (!watchLogsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
watchLogsExecutor.shutdownNow();
|
|
|
}
|
|
|
@@ -290,6 +309,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
qwSopLogsExecutor.shutdownNow();
|
|
|
+ qwSopSmsLogsExecutor.shutdownNow();
|
|
|
watchLogsExecutor.shutdownNow();
|
|
|
courseLinkExecutor.shutdownNow();
|
|
|
courseSopAppLinkExecutor.shutdownNow();
|
|
|
@@ -507,6 +527,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyUserId = String.valueOf(qwUserByRedis.getCompanyUserId()).trim();
|
|
|
String companyId = String.valueOf(qwUserByRedis.getCompanyId()).trim();
|
|
|
Integer sendMsgType = qwUserByRedis.getSendMsgType();
|
|
|
+ Long serverId = qwUserByRedis.getServerId();
|
|
|
|
|
|
if (StringUtil.strIsNullOrEmpty(companyUserId) || StringUtil.strIsNullOrEmpty(companyId) || "null".equals(companyUserId)) {
|
|
|
log.error("员工未绑定销售账号或公司,跳过处理:"+qwUserId);
|
|
|
@@ -632,7 +653,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
insertSopUserLogs(sopUserLogsInfos, logVo, sendTime, ruleTimeVO, content, qwUserId,
|
|
|
companyUserId, companyId, qwUserByRedis.getWelcomeText(),qwUserByRedis.getQwUserName(),
|
|
|
- groupChatMap, miniAppId,config,miniMap, sendMsgType,companies,day);
|
|
|
+ groupChatMap, miniAppId,config,miniMap, sendMsgType,companies,day, serverId);
|
|
|
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
@@ -679,7 +700,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String qwUserId,String companyUserId,String companyId,String welcomeText,String qwUserName,
|
|
|
Map<String, QwGroupChat> groupChatMap,String miniAppId,CourseConfig config,
|
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap, Integer sendMsgType,
|
|
|
- List<Company> companies,Long day) {
|
|
|
+ List<Company> companies,Long day,Long serverId) {
|
|
|
String formattedSendTime = sendTime.toInstant()
|
|
|
.atZone(ZoneId.systemDefault())
|
|
|
.format(DATE_TIME_FORMATTER);
|
|
|
@@ -740,7 +761,8 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, groupChat.getChatId(), groupChat.getName(), null, isOfficial, null,null);
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, groupChat.getChatId(), welcomeText, qwUserName,
|
|
|
- null, true, miniAppId, groupChat,config, miniMap, null, sendMsgType,companies,day);
|
|
|
+ null, true, miniAppId, groupChat,config, miniMap, null,
|
|
|
+ sendMsgType,companies,day,serverId);
|
|
|
} else {
|
|
|
if(groupChat.getChatUserList() != null && !groupChat.getChatUserList().isEmpty()){
|
|
|
groupChat.getChatUserList().forEach(user -> {
|
|
|
@@ -748,8 +770,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
ruleTimeVO.setRemark("客户群催课");
|
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, user.getUserId(), user.getName(), null, isOfficial, null,null);
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
- type, qwUserId, companyUserId, companyId, user.getId().toString(), welcomeText, qwUserName,
|
|
|
- null, false, miniAppId, groupChat,config, miniMap, null, sendMsgType,companies,day);
|
|
|
+ type, qwUserId, companyUserId, companyId, user.getId().toString(), welcomeText,
|
|
|
+ qwUserName, null, false, miniAppId, groupChat,config, miniMap,
|
|
|
+ null, sendMsgType,companies,day,serverId);
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
@@ -764,7 +787,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
QwSopLogs sopLogs = createBaseLog(formattedSendTime, logVo, ruleTimeVO, contactId.getExternalContactId(), externalUserName, fsUserId, isOfficial, contactId.getExternalId(),contactId.getIsDaysNotStudy());
|
|
|
handleLogBasedOnType(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
type, qwUserId, companyUserId, companyId, externalId, welcomeText, qwUserName, fsUserId, false, miniAppId,
|
|
|
- null,config, miniMap, grade, sendMsgType,companies,day);
|
|
|
+ null,config, miniMap, grade, sendMsgType,companies,day,serverId);
|
|
|
} catch (Exception e) {
|
|
|
log.error("处理 externalContactId {} 时发生异常: {}", contactId, e.getMessage(), e);
|
|
|
}
|
|
|
@@ -891,7 +914,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String qwUserName, Long fsUserId, boolean isGroupChat, String miniAppId,
|
|
|
QwGroupChat groupChat,CourseConfig config,
|
|
|
Map<Long, Map<Integer, List<CompanyMiniapp>>> miniMap,
|
|
|
- Integer grade, Integer sendMsgType ,List<Company> companies,Long day ) {
|
|
|
+ Integer grade, Integer sendMsgType ,List<Company> companies,Long day,Long serverId) {
|
|
|
switch (type) {
|
|
|
case 1:
|
|
|
handleNormalMessage(sopLogs, content,companyUserId);
|
|
|
@@ -899,7 +922,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
case 2:
|
|
|
handleCourseMessage(sopLogs, content, logVo, sendTime, courseId, videoId,
|
|
|
qwUserId, companyUserId, companyId, externalId, welcomeText,qwUserName, fsUserId,
|
|
|
- isGroupChat, miniAppId, groupChat,config,miniMap, grade, sendMsgType,companies,day);
|
|
|
+ isGroupChat, miniAppId, groupChat,config,miniMap, grade, sendMsgType,companies,day,serverId);
|
|
|
break;
|
|
|
case 3:
|
|
|
handleOrderMessage(sopLogs, content);
|
|
|
@@ -943,7 +966,7 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
String companyId, String externalId, String welcomeText, String qwUserName,
|
|
|
Long fsUserId, boolean isGroupChat, String miniAppId, QwGroupChat groupChat,CourseConfig config,Map<Long,
|
|
|
Map<Integer, List<CompanyMiniapp>>> miniMap,Integer grade, Integer sendMsgType,
|
|
|
- List<Company> companies,Long day) {
|
|
|
+ List<Company> companies,Long day,Long serverId) {
|
|
|
// 深拷贝 Content 对象,避免使用 JSON
|
|
|
QwSopTempSetting.Content clonedContent = deepCopyContent(content);
|
|
|
if (clonedContent == null) {
|
|
|
@@ -951,8 +974,10 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-//
|
|
|
-// Integer courseType = clonedContent.getCourseType();
|
|
|
+ Long msgNum = Long.valueOf(generateRandomNumberWithLock());
|
|
|
+ sopLogs.setSmsLogsId(msgNum);
|
|
|
+
|
|
|
+ AtomicInteger index = new AtomicInteger(0);
|
|
|
|
|
|
String isOfficial = clonedContent.getIsOfficial();
|
|
|
|
|
|
@@ -964,6 +989,9 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
|
|
|
// 顺序处理每个 Setting,避免过多的并行导致线程开销
|
|
|
for (QwSopTempSetting.Content.Setting setting : settings) {
|
|
|
+
|
|
|
+ Integer currentIndex = index.getAndIncrement();
|
|
|
+
|
|
|
switch (setting.getContentType()) {
|
|
|
//文字和短链一起
|
|
|
case "1":
|
|
|
@@ -1123,6 +1151,26 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
} else {
|
|
|
log.error("生成看课短链失败,跳过设置 URL。");
|
|
|
}
|
|
|
+
|
|
|
+ QwSopSmsLogs sopSmsLogs=new QwSopSmsLogs();
|
|
|
+ sopSmsLogs.setSopId(sopLogs.getSopId());
|
|
|
+ sopSmsLogs.setQwUserId(Long.valueOf(qwUserId));
|
|
|
+ sopSmsLogs.setSopLogId(msgNum);
|
|
|
+ sopSmsLogs.setCompanyId(companyId != null ? Long.valueOf(companyId) : null);
|
|
|
+ sopSmsLogs.setCompanyUserId(companyUserId != null ? Long.valueOf(companyUserId) : null);
|
|
|
+ sopSmsLogs.setContactId(externalId != null ? Long.valueOf(externalId) : null);
|
|
|
+ sopSmsLogs.setServerId(serverId);
|
|
|
+ sopSmsLogs.setStatus(0);
|
|
|
+ sopSmsLogs.setSendTime(new Date());
|
|
|
+ sopSmsLogs.setUpdateTime(new Date());
|
|
|
+ sopSmsLogs.setCreateTime(new Date());
|
|
|
+
|
|
|
+ sopSmsLogs.setFsUserId(sopLogs.getFsUserId());
|
|
|
+ sopSmsLogs.setSmsIndex(currentIndex);
|
|
|
+ sopSmsLogs.setContent(setting.getValue());
|
|
|
+ sopSmsLogs.setSmsTemplateCode(setting.getSmsTemplateCode());
|
|
|
+
|
|
|
+ enqueueQwSopSmsLogs(sopSmsLogs);
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
|
@@ -1672,6 +1720,22 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 将 QwSopSmsLogs 放入队列
|
|
|
+ */
|
|
|
+ private void enqueueQwSopSmsLogs(QwSopSmsLogs smsLogs) {
|
|
|
+ try {
|
|
|
+ boolean offered = qwSopSmsLogsQueue.offer(smsLogs, 5, TimeUnit.SECONDS);
|
|
|
+ if (!offered) {
|
|
|
+ log.error("QwSopSmsLogs 队列已满,无法添加日志: {}", JSON.toJSONString(smsLogs));
|
|
|
+ // 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("插入 QwSopLogs 队列时被中断: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 将 FsCourseWatchLog 放入队列
|
|
|
*/
|
|
|
@@ -1733,6 +1797,35 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 消费 QwSopLogs 队列并进行批量插入
|
|
|
+ */
|
|
|
+ private void consumeQwSopSmsLogs() {
|
|
|
+ List<QwSopSmsLogs> batch = new ArrayList<>(BATCH_SIZE);
|
|
|
+ while (running || !qwSopSmsLogsQueue.isEmpty()) {
|
|
|
+ try {
|
|
|
+ QwSopSmsLogs log = qwSopSmsLogsQueue.poll(1, TimeUnit.SECONDS);
|
|
|
+ if (log != null) {
|
|
|
+ batch.add(log);
|
|
|
+ }
|
|
|
+ if (batch.size() >= BATCH_SIZE || (!batch.isEmpty() && log == null)) {
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ batchInsertQwSopSmsLogs(new ArrayList<>(batch));
|
|
|
+ batch.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("QwSopLogs 消费线程被中断: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理剩余的数据
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ batchInsertQwSopSmsLogs(batch);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 消费 FsCourseWatchLog 队列并进行批量插入
|
|
|
*/
|
|
|
@@ -1839,6 +1932,26 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量插入 QwSopSmsLogs
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ @Retryable(
|
|
|
+ value = {Exception.class},
|
|
|
+ maxAttempts = 3,
|
|
|
+ backoff = @Backoff(delay = 2000)
|
|
|
+ )
|
|
|
+ public void batchInsertQwSopSmsLogs(List<QwSopSmsLogs> logsToInsert) {
|
|
|
+ try {
|
|
|
+ qwSopSmsLogsService.batchInsertQwSopSmsLogsOneTouch(logsToInsert);
|
|
|
+// log.info("批量插入 QwSopSmsLogs 完成,共插入 {} 条记录。", logsToInsert.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("批量插入 QwSopSmsLogs 失败: {}", e.getMessage(), e);
|
|
|
+ // 可选:将失败的数据记录到失败队列或持久化存储以便后续重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 批量插入 FsCourseWatchLog
|
|
|
*/
|