Bläddra i källkod

完课mq重试优化

zyp 6 dagar sedan
förälder
incheckning
c9b5529270

+ 40 - 5
fs-qw-task/src/main/java/com/fs/app/taskService/impl/AsyncCourseWatchFinishService.java

@@ -184,13 +184,23 @@ public class AsyncCourseWatchFinishService {
 
             @Override
             public void onException(Throwable e) {
+//                if (isFlowControlException(e)) {
+//                    // 流控异常处理
+//                    handleFlowControlRetry(TOPIC, finishLog, validationResult, retryCount, e);
+//                    log.error("推送完课打备注失败1流控异常:finishLog={},e={}",JSON.toJSONString(finishLog),e.getMessage());
+//                } else {
+//                    // 其他异常
+//                    log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());
+//                }
                 if (isFlowControlException(e)) {
                     // 流控异常处理
                     handleFlowControlRetry(TOPIC, finishLog, validationResult, retryCount, e);
-                    log.error("推送完课打备注失败1流控异常:finishLog={},e={}",JSON.toJSONString(finishLog),e.getMessage());
+                    log.error("推送完课打备注流控异常,准备重试。retryCount: {}, topic: {}, finishLogId: {}",
+                            retryCount, TOPIC, finishLog.getLogId()); // 只记录关键信息
                 } else {
-                    // 其他异常
-                    log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());
+                    // 其他异常 - 记录完整堆栈
+                    log.error("推送完课打备注失败,非流控异常。finishLog: {}",
+                            JSON.toJSONString(finishLog), e); // 注意这里传 e 而不是 e.getMessage()
                 }
             }
         });
@@ -245,6 +255,15 @@ public class AsyncCourseWatchFinishService {
      * 判断是否为流控异常
      */
     private boolean isFlowControlException(Throwable e) {
+        // 检查异常消息中是否包含流控关键词(双重保障)
+        String errorMessage = e.getMessage();
+        if (errorMessage != null && (
+                errorMessage.contains("flow control") ||
+                errorMessage.contains("exhausted the send quota") ||
+                errorMessage.contains("CODE: 215"))) {
+            return true;
+        }
+
         if (e instanceof MQClientException) {
             return ((MQClientException) e).getResponseCode() == 215;
         }
@@ -256,11 +275,20 @@ public class AsyncCourseWatchFinishService {
         return false;
     }
 
+    private static final int MAX_FLOW_CONTROL_RETRY = 5;
     /**
      * 流控重试处理
      */
     private void handleFlowControlRetry(String topic, FsCourseWatchLog finishLog,
                                         ValidationResult validationResult, int retryCount, Throwable e) {
+        // 检查重试次数限制
+//        if (retryCount >= MAX_FLOW_CONTROL_RETRY) {
+//            log.error("流控重试达到最大次数 {},放弃重试。topic: {}, qwUserId: {}, logId: {}",
+//                    MAX_FLOW_CONTROL_RETRY, topic, finishLog.getQwUserId(), finishLog.getLogId());
+//            //todo 可以记录到数据库或发送告警
+//            return;
+//        }
+
         long backoffTime = calculateBackoffTime(retryCount);
         log.warn("流控触发,{}ms后第{}次重试: topic={}, qwUserId={}",
                 backoffTime, retryCount + 1, topic, finishLog.getQwUserId());
@@ -270,7 +298,8 @@ public class AsyncCourseWatchFinishService {
             try {
                 sendWithFlowControl(finishLog, validationResult, retryCount + 1);
             } catch (Exception ex) {
-                log.error("延迟重试执行异常: {}", ex.getMessage(), ex);
+                log.error("延迟重试执行异常 - qwUserId: {}, logId: {}, error: {}",
+                        finishLog.getQwUserId(), finishLog.getLogId(), ex.getMessage(), ex);
             }
         }, backoffTime, TimeUnit.MILLISECONDS);
     }
@@ -278,7 +307,13 @@ public class AsyncCourseWatchFinishService {
      * 计算退避时间(指数退避)
      */
     private long calculateBackoffTime(int retryCount) {
-        return Math.min(1000 * (long) Math.pow(2, retryCount), 10000); // 最大10秒
+//        return Math.min(1000 * (long) Math.pow(2, retryCount), 10000); // 最大10秒
+        // 基础退避:1s, 2s, 4s, 8s, 16s, 32s
+        long baseDelay = Math.min(1000L * (1L << Math.min(retryCount, 5)), 32000L);
+        // 添加随机抖动 (0~2s),避免多个客户端同时重试
+        long jitter = (long) (Math.random() * 1000);
+
+        return baseDelay + jitter;
     }
 
     @PreDestroy