|
@@ -10,14 +10,22 @@ import com.fs.qw.service.IQwCompanyService;
|
|
|
import com.fs.qw.service.impl.QwExternalContactServiceImpl;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+
|
|
|
+import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
+import org.apache.rocketmq.common.message.MessageConst;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.messaging.support.MessageBuilder;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
@@ -36,68 +44,296 @@ public class AsyncCourseWatchFinishService {
|
|
|
@Autowired
|
|
|
RedisCache redisCache;
|
|
|
|
|
|
+ // 重试队列和调度器
|
|
|
+ private final BlockingQueue<RetryMessage> retryQueue = new LinkedBlockingQueue<>(10000);
|
|
|
+ private final ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
+
|
|
|
+ // 主题映射配置
|
|
|
+ private static final String TOPIC = "course-finish-notes";
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ // 启动重试任务,每5秒处理一次重试队列
|
|
|
+ retryExecutor.scheduleWithFixedDelay(this::processRetryQueue, 10, 5, TimeUnit.SECONDS);
|
|
|
+ log.info("AsyncCourseWatchFinishService 重试队列处理器已启动");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 异步处理完课打备注的
|
|
|
*/
|
|
|
@Async("scheduledExecutorService")
|
|
|
public void executeCourseWatchFinish(FsCourseWatchLog finishLog) {
|
|
|
+// 原代码
|
|
|
+// FsCourseWatchLog watchLog = new FsCourseWatchLog();
|
|
|
+// watchLog.setQwExternalContactId(finishLog.getQwExternalContactId());
|
|
|
+// watchLog.setFinishTime(finishLog.getFinishTime());
|
|
|
+// watchLog.setQwUserId(finishLog.getQwUserId());
|
|
|
+//
|
|
|
+//
|
|
|
+// QwUser qwUserByRedis = qwExternalContactService.getQwUserByRedisForId(String.valueOf(finishLog.getQwUserId()));
|
|
|
+// if (qwUserByRedis == null) {
|
|
|
+// log.error("无企微员工信息 {} 跳过处理。", finishLog.getQwUserId());
|
|
|
+// return;
|
|
|
+// }
|
|
|
+//
|
|
|
+// QwCompany qwCompany = iQwCompanyService.getQwCompanyByRedis(qwUserByRedis.getCorpId());
|
|
|
+//
|
|
|
+// if (qwCompany == null) {
|
|
|
+// log.error("企业微信主体为空 {} 跳过处理。{} ", qwUserByRedis.getCorpId(),watchLog);
|
|
|
+// return;
|
|
|
+// }
|
|
|
+//
|
|
|
+// rocketMQTemplate.asyncSend("course-finish-notes", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
+// @Override public void onSuccess(SendResult sendResult) {
|
|
|
+// log.info("推送完课打备注成功1:{},{}",JSON.toJSONString(finishLog),sendResult.getMsgId());
|
|
|
+// } // 空实现
|
|
|
+// @Override public void onException(Throwable e) {log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
+// });
|
|
|
+
|
|
|
+
|
|
|
+// // 定义默认值
|
|
|
+// final Integer DEFAULT_SERVER_NUM = 99;
|
|
|
+//
|
|
|
+// // 使用
|
|
|
+// Integer companyServerNum = Optional.ofNullable(qwCompany.getCompanyServerNum())
|
|
|
+// .orElse(DEFAULT_SERVER_NUM);
|
|
|
+// switch (companyServerNum){
|
|
|
+// case 1:
|
|
|
+// rocketMQTemplate.asyncSend("course-finish-notes", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
+// @Override public void onSuccess(SendResult sendResult) {
|
|
|
+// log.info("推送完课打备注成功1:{},{}",JSON.toJSONString(finishLog),sendResult.getMsgId());
|
|
|
+// } // 空实现
|
|
|
+// @Override public void onException(Throwable e) {log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
+// });
|
|
|
+// break;
|
|
|
+// case 2:
|
|
|
+//
|
|
|
+// rocketMQTemplate.asyncSend("course-finish-notesTwo", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
+// @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
+// @Override public void onException(Throwable e) {log.error("推送完课打备注失败2:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
+// });
|
|
|
+// break;
|
|
|
+// case 3:
|
|
|
+// rocketMQTemplate.asyncSend("course-finish-notesThree", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
+// @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
+// @Override public void onException(Throwable e) {log.error("推送完课打备注失败3:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
+// });
|
|
|
+// break;
|
|
|
+// default:
|
|
|
+// break;
|
|
|
+// }
|
|
|
+
|
|
|
+
|
|
|
+ // 1. 数据验证和准备
|
|
|
+ ValidationResult validationResult = validateAndPrepareData(finishLog);
|
|
|
+ if (!validationResult.isValid()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 2. 发送消息(使用Tag区分)
|
|
|
+ sendWithFlowControl(finishLog, validationResult, 0);
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * 数据验证和准备
|
|
|
+ */
|
|
|
+ private ValidationResult validateAndPrepareData(FsCourseWatchLog finishLog) {
|
|
|
+ // 准备日志对象
|
|
|
FsCourseWatchLog watchLog = new FsCourseWatchLog();
|
|
|
watchLog.setQwExternalContactId(finishLog.getQwExternalContactId());
|
|
|
watchLog.setFinishTime(finishLog.getFinishTime());
|
|
|
watchLog.setQwUserId(finishLog.getQwUserId());
|
|
|
|
|
|
-
|
|
|
+ // 验证企微用户信息
|
|
|
QwUser qwUserByRedis = qwExternalContactService.getQwUserByRedisForId(String.valueOf(finishLog.getQwUserId()));
|
|
|
if (qwUserByRedis == null) {
|
|
|
log.error("无企微员工信息 {} 跳过处理。", finishLog.getQwUserId());
|
|
|
- return;
|
|
|
+ return ValidationResult.invalid();
|
|
|
}
|
|
|
|
|
|
+ // 验证企业主体
|
|
|
QwCompany qwCompany = iQwCompanyService.getQwCompanyByRedis(qwUserByRedis.getCorpId());
|
|
|
-
|
|
|
if (qwCompany == null) {
|
|
|
- log.error("企业微信主体为空 {} 跳过处理。", qwUserByRedis.getCorpId());
|
|
|
+ log.error("企业微信主体为空 {} 跳过处理。{} ", qwUserByRedis.getCorpId(), watchLog);
|
|
|
+ return ValidationResult.invalid();
|
|
|
+ }
|
|
|
+
|
|
|
+ return ValidationResult.valid(watchLog, qwUserByRedis, qwCompany);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 带流控处理的消息发送
|
|
|
+ */
|
|
|
+ private void sendWithFlowControl(FsCourseWatchLog finishLog,
|
|
|
+ ValidationResult validationResult, int retryCount) {
|
|
|
+ if (retryCount >= 3) {
|
|
|
+ log.warn("消息重试超过最大次数,转入重试队列: topic={}, qwUserId={}",
|
|
|
+ TOPIC, finishLog.getQwUserId());
|
|
|
+ offerToRetryQueue(finishLog, validationResult);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- rocketMQTemplate.asyncSend("course-finish-notes", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
- @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
- @Override public void onException(Throwable e) {log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
+ rocketMQTemplate.asyncSend(TOPIC, JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
+ @Override
|
|
|
+ public void onSuccess(SendResult sendResult) {
|
|
|
+ log.info("推送完课打备注成功1:{},{}",JSON.toJSONString(finishLog),sendResult.getMsgId());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onException(Throwable e) {
|
|
|
+ if (isFlowControlException(e)) {
|
|
|
+ // 流控异常处理
|
|
|
+ handleFlowControlRetry(TOPIC, finishLog, validationResult, retryCount, e);
|
|
|
+ log.error("推送完课打备注失败1流控异常:finishLog={},e={}",JSON.toJSONString(finishLog),e.getMessage());
|
|
|
+ } else {
|
|
|
+ // 其他异常
|
|
|
+ log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
});
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 放入重试队列
|
|
|
+ */
|
|
|
+ private void offerToRetryQueue(FsCourseWatchLog finishLog,
|
|
|
+ ValidationResult validationResult) {
|
|
|
+ RetryMessage retryMessage = new RetryMessage(finishLog, validationResult);
|
|
|
+ boolean offered = retryQueue.offer(retryMessage);
|
|
|
+ if (offered) {
|
|
|
+ log.info("消息已加入重试队列: topic={}, qwUserId={}", TOPIC, finishLog.getQwUserId());
|
|
|
+ } else {
|
|
|
+ log.error("重试队列已满,消息可能丢失: topic={}, qwUserId={}", TOPIC, finishLog.getQwUserId());
|
|
|
+ // 这里可以接入告警系统
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理重试队列
|
|
|
+ */
|
|
|
+ private void processRetryQueue() {
|
|
|
+ try {
|
|
|
+ int processedCount = 0;
|
|
|
+ RetryMessage retryMessage;
|
|
|
+
|
|
|
+ while (processedCount < 100 && (retryMessage = retryQueue.poll()) != null) {
|
|
|
+ try {
|
|
|
+ // 重新发送消息
|
|
|
+ sendWithFlowControl(retryMessage.getFinishLog(),
|
|
|
+ retryMessage.getValidationResult(), 0);
|
|
|
+ processedCount++;
|
|
|
+
|
|
|
+ Thread.sleep(10);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("重试队列处理失败: {}", e.getMessage());
|
|
|
+ offerToRetryQueue(retryMessage.getFinishLog(), retryMessage.getValidationResult());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (processedCount > 0) {
|
|
|
+ log.debug("重试队列处理完成,本次处理数量: {}", processedCount);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理重试队列异常: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 判断是否为流控异常
|
|
|
+ */
|
|
|
+ private boolean isFlowControlException(Throwable e) {
|
|
|
+ if (e instanceof MQClientException) {
|
|
|
+ return ((MQClientException) e).getResponseCode() == 215;
|
|
|
+ }
|
|
|
+ // 检查异常链
|
|
|
+ Throwable cause = e.getCause();
|
|
|
+ if (cause instanceof MQClientException) {
|
|
|
+ return ((MQClientException) cause).getResponseCode() == 215;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 流控重试处理
|
|
|
+ */
|
|
|
+ private void handleFlowControlRetry(String topic, FsCourseWatchLog finishLog,
|
|
|
+ ValidationResult validationResult, int retryCount, Throwable e) {
|
|
|
+ long backoffTime = calculateBackoffTime(retryCount);
|
|
|
+ log.warn("流控触发,{}ms后第{}次重试: topic={}, qwUserId={}",
|
|
|
+ backoffTime, retryCount + 1, topic, finishLog.getQwUserId());
|
|
|
|
|
|
+ // 使用 ScheduledExecutorService 进行延迟执行
|
|
|
+ retryExecutor.schedule(() -> {
|
|
|
+ try {
|
|
|
+ sendWithFlowControl(finishLog, validationResult, retryCount + 1);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("延迟重试执行异常: {}", ex.getMessage(), ex);
|
|
|
+ }
|
|
|
+ }, backoffTime, TimeUnit.MILLISECONDS);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 计算退避时间(指数退避)
|
|
|
+ */
|
|
|
+ private long calculateBackoffTime(int retryCount) {
|
|
|
+ return Math.min(1000 * (long) Math.pow(2, retryCount), 10000); // 最大10秒
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void destroy() {
|
|
|
+ retryExecutor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!retryExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
|
|
+ retryExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ retryExecutor.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ log.info("AsyncCourseWatchFinishService 已关闭");
|
|
|
+ }
|
|
|
|
|
|
- // 定义默认值
|
|
|
- final Integer DEFAULT_SERVER_NUM = 99;
|
|
|
-
|
|
|
- // 使用
|
|
|
- Integer companyServerNum = Optional.ofNullable(qwCompany.getCompanyServerNum())
|
|
|
- .orElse(DEFAULT_SERVER_NUM);
|
|
|
- switch (companyServerNum){
|
|
|
- case 1:
|
|
|
- rocketMQTemplate.asyncSend("course-finish-notes", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
- @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
- @Override public void onException(Throwable e) {log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
- });
|
|
|
- break;
|
|
|
- case 2:
|
|
|
-
|
|
|
- rocketMQTemplate.asyncSend("course-finish-notesTwo", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
- @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
- @Override public void onException(Throwable e) {log.error("推送完课打备注失败2:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
- });
|
|
|
- break;
|
|
|
- case 3:
|
|
|
- rocketMQTemplate.asyncSend("course-finish-notesThree", JSON.toJSONString(finishLog), new SendCallback() {
|
|
|
- @Override public void onSuccess(SendResult sendResult) {} // 空实现
|
|
|
- @Override public void onException(Throwable e) {log.error("推送完课打备注失败3:{},{}",JSON.toJSONString(finishLog),e.getMessage());} // 空实现
|
|
|
- });
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
+ // 内部辅助类
|
|
|
+ private static class ValidationResult {
|
|
|
+ private final boolean valid;
|
|
|
+ private final FsCourseWatchLog watchLog;
|
|
|
+ private final QwUser qwUser;
|
|
|
+ private final QwCompany qwCompany;
|
|
|
+
|
|
|
+ public ValidationResult(boolean valid, FsCourseWatchLog watchLog, QwUser qwUser, QwCompany qwCompany) {
|
|
|
+ this.valid = valid;
|
|
|
+ this.watchLog = watchLog;
|
|
|
+ this.qwUser = qwUser;
|
|
|
+ this.qwCompany = qwCompany;
|
|
|
}
|
|
|
|
|
|
+ public static ValidationResult valid(FsCourseWatchLog watchLog, QwUser qwUser, QwCompany qwCompany) {
|
|
|
+ return new ValidationResult(true, watchLog, qwUser, qwCompany);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static ValidationResult invalid() {
|
|
|
+ return new ValidationResult(false, null, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isValid() { return valid; }
|
|
|
+ public FsCourseWatchLog getWatchLog() { return watchLog; }
|
|
|
+ public QwUser getQwUser() { return qwUser; }
|
|
|
+ public QwCompany getQwCompany() { return qwCompany; }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RetryMessage {
|
|
|
+ private final FsCourseWatchLog finishLog;
|
|
|
+ private final ValidationResult validationResult;
|
|
|
+
|
|
|
+ public RetryMessage(FsCourseWatchLog finishLog, ValidationResult validationResult) {
|
|
|
+ this.finishLog = finishLog;
|
|
|
+ this.validationResult = validationResult;
|
|
|
+ }
|
|
|
|
|
|
+ public FsCourseWatchLog getFinishLog() { return finishLog; }
|
|
|
+ public ValidationResult getValidationResult() { return validationResult; }
|
|
|
}
|
|
|
|
|
|
}
|