|
|
@@ -0,0 +1,486 @@
|
|
|
+package com.fs.app.task.consumer.sop;
|
|
|
+
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONException;
|
|
|
+import com.fs.app.core.ConditionalOnLoadControl;
|
|
|
+import com.fs.app.properties.AppTaskProperties;
|
|
|
+import com.fs.app.sender.AbstractAppMessageSender;
|
|
|
+import com.fs.app.sender.MessageSenderResult;
|
|
|
+import com.fs.app.sender.MessageSenderResultInfo;
|
|
|
+import com.fs.app.sender.MessageUserPayload;
|
|
|
+import com.fs.app.sender.enums.SenderType;
|
|
|
+import com.fs.app.sender.factory.AppSopLogsSenderFactory;
|
|
|
+import com.fs.app.sender.properties.ConfigProperties;
|
|
|
+import com.fs.app.sop.domain.AppSop;
|
|
|
+import com.fs.app.sop.domain.AppSopLogs;
|
|
|
+import com.fs.app.sop.dto.AutoSopTimeParam;
|
|
|
+import com.fs.app.sop.result.AppSopCourseFinishTempSetting;
|
|
|
+import com.fs.app.sop.service.IAppSopLogsService;
|
|
|
+import com.fs.app.sop.service.IAppSopService;
|
|
|
+import com.fs.app.watchlog.domain.AppCourseWatchLog;
|
|
|
+import com.fs.app.watchlog.service.IAppCourseWatchLogService;
|
|
|
+import com.fs.common.core.redis.RedisCache;
|
|
|
+import com.fs.common.redis.RedisBatchUtil;
|
|
|
+import com.fs.common.utils.date.DateUtil;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
+@ConditionalOnLoadControl(
|
|
|
+ value = {
|
|
|
+ "sendSopLogs",
|
|
|
+ "consumer"
|
|
|
+ }
|
|
|
+)
|
|
|
+public class AppSopLogsConsumer {
|
|
|
+
|
|
|
+ private final IAppSopService appSopService;
|
|
|
+ private final IAppCourseWatchLogService appCourseWatchLogService;
|
|
|
+ private final IAppSopLogsService appSopLogsService;
|
|
|
+ private final ThreadPoolTaskExecutor consumerThreadPool;
|
|
|
+ private final RedisBatchUtil redisBatchUtil;
|
|
|
+ private final RedisCache redisCache;
|
|
|
+ private final AppSopLogsSenderFactory senderFactory;
|
|
|
+ private final AppTaskProperties appTaskProperties;
|
|
|
+ private final ConfigProperties configProperties;
|
|
|
+
|
|
|
+ // 当前正在发送的 SOP
|
|
|
+ private static final String CONSUMER_SOP_SEND_ID_KEY_PREFIX = "app:sop:consumer:send:id:";
|
|
|
+
|
|
|
+ @Value("${server.port}")
|
|
|
+ private String port;
|
|
|
+
|
|
|
+ // 节点唯一标识
|
|
|
+ private String nodeId;
|
|
|
+
|
|
|
+ // 初始化节点 ID
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ log.info("配置参数:{}", configProperties);
|
|
|
+ String node = appTaskProperties.getConsumer().getNode();
|
|
|
+ nodeId = node + "-" + port;//节点编号+端口生成唯一节点编号
|
|
|
+ if (!redisBatchUtil.consumerNodeExists(nodeId)) {
|
|
|
+ redisBatchUtil.setConsumerNode(nodeId);
|
|
|
+ log.info("当前消费者节点:{}加载成功", nodeId);
|
|
|
+ } else {
|
|
|
+ log.warn("当前消费者节点:{}已存在", nodeId);
|
|
|
+ }
|
|
|
+ log.info("发送sop待发记录任务已装载!!!!");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费逻辑
|
|
|
+ */
|
|
|
+ @Scheduled(fixedDelayString = "${task.consumer.compensate-fixed-delay:10000}", initialDelay = 10000)
|
|
|
+ public void sendSopLogs() {
|
|
|
+ try {
|
|
|
+ StringBuilder nodeFlag = new StringBuilder(nodeId).append("@compensate");
|
|
|
+ Date sendTime = new Date();
|
|
|
+ //锁定范围数据
|
|
|
+ appSopLogsService.lambdaUpdate()
|
|
|
+ .set(AppSopLogs::getHandleStatus, 1)
|
|
|
+ .set(AppSopLogs::getHandleTime, new Date())
|
|
|
+ .set(AppSopLogs::getHandleNode, nodeFlag.toString())
|
|
|
+ .eq(AppSopLogs::getHandleStatus, 0)
|
|
|
+ .eq(AppSopLogs::getAppSendStatus, 0)
|
|
|
+ .le(AppSopLogs::getSendTime, sendTime)
|
|
|
+ .last("ORDER BY id LIMIT " + configProperties.getBatchSize())
|
|
|
+ .update();
|
|
|
+
|
|
|
+ // 查询需要补偿的数据
|
|
|
+ List<AppSopLogs> pendingSops = appSopLogsService.lambdaQuery()
|
|
|
+ .eq(AppSopLogs::getAppSendStatus, 0)
|
|
|
+ .eq(AppSopLogs::getHandleStatus, 1)
|
|
|
+ .eq(AppSopLogs::getHandleNode, nodeFlag.toString())
|
|
|
+ .le(AppSopLogs::getSendTime, sendTime)
|
|
|
+ .list();
|
|
|
+
|
|
|
+ if (ObjectUtil.isEmpty(pendingSops)) {
|
|
|
+ log.info("节点[{}],暂无需要补偿的数据,跳过", nodeId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 异步处理发送逻辑
|
|
|
+ List<MessageSenderResult> mssList = new ArrayList<>();
|
|
|
+ List<Future<MessageSenderResult>> futureList = new ArrayList<>();
|
|
|
+ for (AppSopLogs sopLogs : pendingSops) {
|
|
|
+ Future<MessageSenderResult> future = consumerThreadPool.submit(() -> execute(sopLogs));
|
|
|
+ futureList.add(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 收集结果
|
|
|
+ for (Future<MessageSenderResult> future : futureList) {
|
|
|
+ try {
|
|
|
+ MessageSenderResult result = future.get(appTaskProperties.getConsumer().getFutureTimeoutSeconds(), TimeUnit.SECONDS);
|
|
|
+ if (ObjectUtil.isNotEmpty(result)) {
|
|
|
+ mssList.add(result);
|
|
|
+ }
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ log.error("节点[{}]:补偿任务超时,取消任务", nodeId, e);
|
|
|
+ future.cancel(true);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("节点[{}]:补偿处理失败:{}", nodeId, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批次更新状态
|
|
|
+ if (ObjectUtil.isNotEmpty(mssList)) {
|
|
|
+ batchUpdateStatus(mssList);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("节点[{}]:补偿处理异常:{}", nodeId, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批次更新日志状态
|
|
|
+ */
|
|
|
+ public void batchUpdateStatus(List<MessageSenderResult> mssList) {
|
|
|
+ if (ObjectUtil.isEmpty(mssList)) {
|
|
|
+ log.warn("批次更新状态:无待更新数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<AppSopLogs> updateList = new ArrayList<>(mssList.size());
|
|
|
+ for (MessageSenderResult result : mssList) {
|
|
|
+ if (ObjectUtil.isEmpty(result) || ObjectUtil.isEmpty(result.getId()) || ObjectUtil.isEmpty(result.getInfo())) {
|
|
|
+ log.warn("跳过空结果的状态更新");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ int infoSize = result.getInfo().size();
|
|
|
+ // 获取成功的子项,因为一条sop_logs里面可能会有多个消息要发送
|
|
|
+ long successCount = Optional.of(result)
|
|
|
+ .map(MessageSenderResult::getInfo)
|
|
|
+ .orElse(Collections.emptyList())
|
|
|
+ .stream()
|
|
|
+ .filter(MessageSenderResultInfo::isSuccess)
|
|
|
+ .count();
|
|
|
+ int sendStatus = 2;//默认全部失败
|
|
|
+ String message = "APP推送消息失败[全部]";
|
|
|
+ if (successCount == infoSize) {
|
|
|
+ sendStatus = 1;//全部成功
|
|
|
+ message = "APP推送消息成功[全部]";
|
|
|
+ } else if (successCount < infoSize) {
|
|
|
+ sendStatus = 4;//部分成功
|
|
|
+ message = "APP推送消息成功[部分]";
|
|
|
+ }
|
|
|
+ AppSopLogs updateEntity = new AppSopLogs();
|
|
|
+ updateEntity.setId(result.getId());
|
|
|
+ updateEntity.setIsHaveApp(1);
|
|
|
+ updateEntity.setRealSendTime(new Date());
|
|
|
+ updateEntity.setAppSendStatus(sendStatus);
|
|
|
+ updateEntity.setAppSendRemark(message);
|
|
|
+ updateEntity.setRemarkDetail(JSON.toJSONString(result.getInfo()));
|
|
|
+ updateEntity.setHandleStatus(2);//处理完成
|
|
|
+ updateList.add(updateEntity);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ appSopLogsService.updateBatchById(updateList);
|
|
|
+ log.info("批次更新状态,共{}条记录", updateList.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("批次更新状态失败", e);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行发送逻辑
|
|
|
+ */
|
|
|
+ private MessageSenderResult execute(AppSopLogs sopLogs) {
|
|
|
+ String redisKey = CONSUMER_SOP_SEND_ID_KEY_PREFIX + sopLogs.getId();
|
|
|
+ MessageSenderResult result = new MessageSenderResult();
|
|
|
+ result.setId(sopLogs.getId());
|
|
|
+ try {
|
|
|
+ // 检查是否有节点正在处理
|
|
|
+ String nodeSendTime = redisCache.getCacheObject(redisKey);
|
|
|
+ if (nodeSendTime != null) {
|
|
|
+ String[] arr = nodeSendTime.split("@");
|
|
|
+ log.warn("消息ID:{}已被节点{}锁定处理中(锁定时间:{}),本次处理跳过",
|
|
|
+ sopLogs.getId(), arr[0], new Date(Long.parseLong(arr[1])));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ObjectUtil.isEmpty(sopLogs.getContentJson())) {
|
|
|
+ log.error("节点[{}]消息ID:{} ContentJson为空,发送失败", nodeId, sopLogs.getId());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ AppSopCourseFinishTempSetting setting;
|
|
|
+ try {
|
|
|
+ setting = JSON.parseObject(sopLogs.getContentJson(), AppSopCourseFinishTempSetting.class);
|
|
|
+ } catch (JSONException e) {
|
|
|
+ log.error("节点[{}]消息ID:{} JSON解析失败", nodeId, sopLogs.getId(), e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ //检查是否满足发送条件
|
|
|
+ if (!this.checkIsNeedSend(sopLogs, setting)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 无配置则跳过
|
|
|
+ List<AppSopCourseFinishTempSetting.Setting> allContent = setting.getSetting();
|
|
|
+ if (ObjectUtil.isEmpty(allContent)) {
|
|
|
+ log.warn("节点[{}]消息ID:{}无发送配置,跳过", nodeId, sopLogs.getId());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 标记发送中
|
|
|
+ redisCache.setCacheObject(redisKey, (nodeId + "@" + System.currentTimeMillis()), 30, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ // 课程消息(contentType=9)
|
|
|
+ List<AppSopCourseFinishTempSetting.Setting> courseList = allContent.stream()
|
|
|
+ .filter(e -> "9".equals(e.getContentType()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ MessageUserPayload mu = new MessageUserPayload(sopLogs.getAppCustomerId(), sopLogs.getFsUserId());
|
|
|
+ if (ObjectUtil.isNotEmpty(courseList)) {
|
|
|
+ log.info("节点[{}]开始发送消息ID:{}的APP课程消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+ AbstractAppMessageSender courseSender = senderFactory.getSender(SenderType.COURSE.getType());
|
|
|
+ result.getInfo().addAll(courseSender.sendForSop(courseList, mu));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 文本消息(contentType=11)
|
|
|
+ List<AppSopCourseFinishTempSetting.Setting> txtList = allContent.stream()
|
|
|
+ .filter(e -> "11".equals(e.getContentType()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (ObjectUtil.isNotEmpty(txtList)) {
|
|
|
+ log.info("节点[{}]开始发送消息ID:{}的APP文本消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+ AbstractAppMessageSender textSender = senderFactory.getSender(SenderType.TXT.getType());
|
|
|
+ result.getInfo().addAll(textSender.sendForSop(txtList, mu));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 语音消息(contentType=12)
|
|
|
+ List<AppSopCourseFinishTempSetting.Setting> voiceList = allContent.stream()
|
|
|
+ .filter(e -> "12".equals(e.getContentType()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if (ObjectUtil.isNotEmpty(voiceList)) {
|
|
|
+ log.info("节点[{}]开始发送消息ID:{}的APP语音消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+ AbstractAppMessageSender voiceSender = senderFactory.getSender(SenderType.VOICE.getType());
|
|
|
+ result.getInfo().addAll(voiceSender.sendForSop(voiceList, mu));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 疗法消息(contentType=20)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> pkgList = allContent.stream()
|
|
|
+// .filter(e -> "20".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(pkgList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP疗法消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender pkgSender = senderFactory.getSender(SenderType.PACKAGE.getType());
|
|
|
+// result.getInfo().addAll(pkgSender.sendForSop(pkgList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 图片消息(contentType=2)
|
|
|
+ List<AppSopCourseFinishTempSetting.Setting> imgList = allContent.stream()
|
|
|
+ .filter(e -> "2".equals(e.getContentType()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if (ObjectUtil.isNotEmpty(imgList)) {
|
|
|
+ log.info("节点[{}]开始发送消息ID:{}的APP图片消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+ AbstractAppMessageSender imgSender = senderFactory.getSender(SenderType.IMG.getType());
|
|
|
+ result.getInfo().addAll(imgSender.sendForSop(imgList, mu));
|
|
|
+ }
|
|
|
+//
|
|
|
+// // 直播消息(contentType=21)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> liveList = allContent.stream()
|
|
|
+// .filter(e -> "21".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(liveList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP直播消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender liveSender = senderFactory.getSender(SenderType.LIVE.getType());
|
|
|
+// result.getInfo().addAll(liveSender.sendForSop(liveList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 民品消息(contentType=22)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> productList = allContent.stream()
|
|
|
+// .filter(e -> "22".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(productList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP民品消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender productSender = senderFactory.getSender(SenderType.PRODUCT.getType());
|
|
|
+// result.getInfo().addAll(productSender.sendForSop(productList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 药品消息(contentType=23)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> medicinesList = allContent.stream()
|
|
|
+// .filter(e -> "23".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(medicinesList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP药品消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender medicinesSender = senderFactory.getSender(SenderType.MEDICINES.getType());
|
|
|
+// result.getInfo().addAll(medicinesSender.sendForSop(medicinesList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 短视频消息(contentType=24)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> shortVideoList = allContent.stream()
|
|
|
+// .filter(e -> "24".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(shortVideoList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP短视频消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender shortVideoSender = senderFactory.getSender(SenderType.SHORT_VIDEO.getType());
|
|
|
+// result.getInfo().addAll(shortVideoSender.sendForSop(shortVideoList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 文章消息(contentType=25)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> articleList = allContent.stream()
|
|
|
+// .filter(e -> "25".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(articleList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP文章消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender articleSender = senderFactory.getSender(SenderType.ARTICLE.getType());
|
|
|
+// result.getInfo().addAll(articleSender.sendForSop(articleList, mu));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 文章消息(contentType=26)
|
|
|
+// List<AppSopCourseFinishTempSetting.Setting> openClassVideoList = allContent.stream()
|
|
|
+// .filter(e -> "26".equals(e.getContentType()))
|
|
|
+// .collect(Collectors.toList());
|
|
|
+// if (ObjectUtil.isNotEmpty(openClassVideoList)) {
|
|
|
+// log.info("节点[{}]开始发送消息ID:{}的APP公开课消息,用户:{}", nodeId, sopLogs.getId(), sopLogs.getFsUserId());
|
|
|
+// AbstractAppMessageSender openClassVideoSender = senderFactory.getSender(SenderType.OPEN_CLASS.getType());
|
|
|
+// result.getInfo().addAll(openClassVideoSender.sendForSop(openClassVideoList, mu));
|
|
|
+// }
|
|
|
+
|
|
|
+ log.info("节点[{}]消息ID:{}发送完成,子项数量:{}", nodeId, sopLogs.getId(), result.getInfo().size());
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("节点[{}]消息ID:{}发送逻辑异常", nodeId, sopLogs.getId(), e);
|
|
|
+ return result;
|
|
|
+ } finally {
|
|
|
+ // 清理Redis防重发key
|
|
|
+ try {
|
|
|
+ redisCache.deleteObject(redisKey);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("节点[{}]消息ID:{}清理Redis key失败", nodeId, sopLogs.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查是否满足发送条件
|
|
|
+ *
|
|
|
+ * @param sopLogs
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean checkIsNeedSend(AppSopLogs sopLogs, AppSopCourseFinishTempSetting setting) {
|
|
|
+ AppSop sop = this.appSopService.lambdaQuery()
|
|
|
+ .eq(AppSop::getId, sopLogs.getSopId())
|
|
|
+ .one();
|
|
|
+ if (ObjectUtil.isEmpty(sop) || ObjectUtil.isEmpty(sop.getExpiryTime())) {
|
|
|
+ // 作废消息,未配置过期消息
|
|
|
+ log.info("待发记录,LOG_ID:{}, SOP任务被删除或未配置过期时间", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "SOP任务被删除/或配置错误");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ //消息发送时间
|
|
|
+ LocalDateTime sendTime = DateUtil.dateToLocalDateTime(sopLogs.getSendTime());
|
|
|
+ LocalDateTime expiryDateTime = sendTime.plusHours(sop.getExpiryTime());
|
|
|
+ if (ObjectUtil.isNotEmpty(sop.getAutoSopTime())) {
|
|
|
+ AutoSopTimeParam param = JSON.parseObject(sop.getAutoSopTime(), AutoSopTimeParam.class);
|
|
|
+ //已过期,且sop配置的过期消息是否发送为否
|
|
|
+ if (LocalDateTime.now().isAfter(expiryDateTime) && 2 == param.getAutoSopSend()) {
|
|
|
+ // 作废消息
|
|
|
+ log.info("待发记录,LOG_ID:{}, 已过期,不发送", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "已过期,不发送");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Integer courseType;
|
|
|
+ //模板未指定消息类型
|
|
|
+ if (ObjectUtil.isEmpty(courseType = setting.getCourseType())) {
|
|
|
+ log.info("待发记录,LOG_ID:{}, 模板未选消息类型,不发送", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "模板未选消息类型,不发送");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ Integer cacheValue = redisCache.getCacheObject("sopCourse:video:isPause:" + setting.getVideoId());
|
|
|
+ int isPause = (cacheValue != null) ? cacheValue : 0;
|
|
|
+ log.info("待发记录,LOG_ID:{},判断课程({})小节({})当前状态:{}", sopLogs.getId(), setting.getCourseId(), setting.getVideoId(), isPause);
|
|
|
+ //课节被暂停
|
|
|
+ if (isPause == 1) {
|
|
|
+ log.info("待发记录,LOG_ID:{}, 课节暂停,不发送", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "课节暂停,不发送");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (courseType != 0) {// 非普通消息,进行复杂的条件判断
|
|
|
+ AppCourseWatchLog watchLog = this.appCourseWatchLogService.lambdaQuery()
|
|
|
+ .eq(AppCourseWatchLog::getVideoId, setting.getVideoId())//课节id
|
|
|
+ .eq(AppCourseWatchLog::getAppCustomerId, sopLogs.getAppCustomerId())//客服id
|
|
|
+ .eq(AppCourseWatchLog::getUserId, sopLogs.getFsUserId())//用户
|
|
|
+ .eq(AppCourseWatchLog::getSourceType, 0)//来源:sop
|
|
|
+ .one();
|
|
|
+ log.debug("待发记录,LOG_ID:{}-看课记录参数:videoId:{}, customerRoleId:{}, fsUserId:{}",
|
|
|
+ sopLogs.getId(), setting.getVideoId(), sopLogs.getAppCustomerId(), sopLogs.getFsUserId());
|
|
|
+ log.debug("待发记录,LOG_ID:{}-看课记录:{}", sopLogs.getId(), watchLog);
|
|
|
+ if (ObjectUtil.isNotEmpty(watchLog)) {
|
|
|
+ //课程类型
|
|
|
+ if (!isCourseTypeValid(courseType, watchLog.getLogType())) {
|
|
|
+ // 作废消息
|
|
|
+ log.info("待发记录,LOG_ID:{}, 看课状态未满足,不发送", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "看课状态未满足,不发送");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("待发记录,LOG_ID:{}, 无观看记录,不发送", sopLogs.getId());
|
|
|
+ this.setAppSopLogsNotSend(sopLogs.getId(), "无观看记录,不发送");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param courseType 模板消息类型(0-普通消息,1-待看课消息,2-看课中断消息,3-已完课消息,4-待看和中断消息)
|
|
|
+ * @param logType 看课记录类型(1-看课中 2-完课 3-待看课 4-看课中断)
|
|
|
+ * @return boolean
|
|
|
+ */
|
|
|
+ private boolean isCourseTypeValid(Integer courseType, Long logType) {
|
|
|
+ switch (courseType) {
|
|
|
+ case 0://普通消息
|
|
|
+ return true; // courseType == 0 直接返回 true
|
|
|
+ case 1://1-待看课消息
|
|
|
+ return logType == 3; // 对应看课记录类型:待看课 4看课中断
|
|
|
+ case 2://2-看课中断消息
|
|
|
+ return logType == 4; // 对应看课记录类型:看课中断
|
|
|
+ case 3://3-已完课消息
|
|
|
+ return logType == 2; // 对应看课记录类型:完课
|
|
|
+ case 4://4-待看和中断消息
|
|
|
+ return logType == 3 || logType == 4;
|
|
|
+ default:
|
|
|
+ return false; // 其他情况返回 false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 设置待发记录状态为不发送
|
|
|
+ *
|
|
|
+ * @param sopLogsId
|
|
|
+ * @param remark
|
|
|
+ */
|
|
|
+ private void setAppSopLogsNotSend(Long sopLogsId, String remark) {
|
|
|
+ this.appSopLogsService.lambdaUpdate()
|
|
|
+ .set(AppSopLogs::getSendStatus, 5)//已作废
|
|
|
+ .set(AppSopLogs::getReceivingStatus, 4)//
|
|
|
+ .set(AppSopLogs::getAppSendStatus, 3)//不发送
|
|
|
+ .set(AppSopLogs::getHandleStatus, 3)//不再处理
|
|
|
+ .set(AppSopLogs::getRemark, remark)
|
|
|
+ .set(AppSopLogs::getAppSendRemark, remark)
|
|
|
+ .set(AppSopLogs::getUpdateTime, new Date())
|
|
|
+ .eq(AppSopLogs::getId, sopLogsId)
|
|
|
+ .update();
|
|
|
+ }
|
|
|
+}
|