Kaynağa Gözat

优化完课备注

ct 1 hafta önce
ebeveyn
işleme
1633d96386

+ 7 - 34
fs-qw-mq/src/main/java/com/fs/app/mq/RocketMQConsumerCourseFinishService.java

@@ -6,56 +6,29 @@ import com.fs.course.domain.FsCourseWatchLog;
 import com.fs.course.service.IFsCourseFinishTempService;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.stereotype.Service;
 
 @Slf4j
 @Service
 @AllArgsConstructor
-@RocketMQMessageListener(
-        topic = "course-finish-notes", consumerGroup = "course-finish-group",
-        selectorType = SelectorType.TAG,
-        selectorExpression = "TAG_1 || TAG_2 || TAG_3 || TAG_99")
-public class RocketMQConsumerCourseFinishService implements RocketMQListener<MessageExt> {
+@RocketMQMessageListener(topic = "course-finish-notes", consumerGroup = "course-finish-group")
+public class RocketMQConsumerCourseFinishService implements RocketMQListener<String> {
 
     private final IFsCourseFinishTempService courseFinishTempService;
 
-//    @Override
-//    public void onMessage(String message) {
-//
-//
-//        log.info("收到消息1:" + message);
-//
-//        FsCourseWatchLog watchLog = JSON.parseObject(message, FsCourseWatchLog.class);
-//        if (watchLog == null || watchLog.getQwExternalContactId() == null) {
-//            return;
-//        }
-//        courseFinishTempService.finishCourseExtContactIdByRemark(watchLog);
-//
-//    }
-
     @Override
-    public void onMessage(MessageExt messageExt) {
-        String messageBody = new String(messageExt.getBody());
-        String tags = messageExt.getTags();
+    public void onMessage(String message) {
+
 
-//        log.info("收到消息1:" + message);
-        log.info("收到消息, Tag: {}, 内容: {}", tags, messageBody);
+        log.info("收到消息1:" + message);
 
-        FsCourseWatchLog watchLog = JSON.parseObject(messageBody, FsCourseWatchLog.class);
+        FsCourseWatchLog watchLog = JSON.parseObject(message, FsCourseWatchLog.class);
         if (watchLog == null || watchLog.getQwExternalContactId() == null) {
             return;
         }
-        // 根据不同的Tag执行不同的业务逻辑(如果需要)
-        processByTag(tags, watchLog);
-    }
-
-    private void processByTag(String tag, FsCourseWatchLog watchLog) {
-        // 如果需要根据Tag区分处理逻辑,可以在这里实现
-        // 如果处理逻辑完全一样,直接调用统一方法
         courseFinishTempService.finishCourseExtContactIdByRemark(watchLog);
+
     }
 }

+ 5 - 27
fs-qw-task/src/main/java/com/fs/app/taskService/impl/AsyncCourseWatchFinishService.java

@@ -16,6 +16,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.scheduling.annotation.Async;
@@ -49,7 +50,6 @@ public class AsyncCourseWatchFinishService {
 
     // 主题映射配置
     private static final String TOPIC = "course-finish-notes";
-    private static final Integer DEFAULT_SERVER_NUM = 99;
 
     @PostConstruct
     public void init() {
@@ -163,14 +163,6 @@ public class AsyncCourseWatchFinishService {
         return ValidationResult.valid(watchLog, qwUserByRedis, qwCompany);
     }
 
-    /**
-     * 获取目标Tag(替换原来的获取Topic方法)
-     */
-    private String getTargetTag(QwCompany qwCompany) {
-        Integer companyServerNum = Optional.ofNullable(qwCompany.getCompanyServerNum())
-                .orElse(DEFAULT_SERVER_NUM);
-        return "TAG_" + companyServerNum; // 生成对应的Tag
-    }
 
     /**
      * 带流控处理的消息发送
@@ -184,22 +176,10 @@ public class AsyncCourseWatchFinishService {
             return;
         }
 
-        // 获取Tag
-        String tag = getTargetTag(validationResult.getQwCompany());
-        String messageBody = JSON.toJSONString(finishLog);
-
-        // 构建带Tag的消息
-        org.springframework.messaging.Message<String> message = MessageBuilder
-                .withPayload(messageBody)
-                .setHeader(MessageConst.PROPERTY_TAGS, tag)
-                .build();
-
-        rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
+        rocketMQTemplate.asyncSend(TOPIC, JSON.toJSONString(finishLog), new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
-//                log.info("推送完课打备注成功1:{},{}",JSON.toJSONString(finishLog),sendResult.getMsgId());
-                log.info("推送完课打备注成功1: tag={},finishLog={}, msgId={}",
-                        tag,JSON.toJSONString(finishLog), sendResult.getMsgId());
+                log.info("推送完课打备注成功1:{},{}",JSON.toJSONString(finishLog),sendResult.getMsgId());
             }
 
             @Override
@@ -207,12 +187,10 @@ public class AsyncCourseWatchFinishService {
                 if (isFlowControlException(e)) {
                     // 流控异常处理
                     handleFlowControlRetry(TOPIC, finishLog, validationResult, retryCount, e);
-                    log.error("推送完课打备注失败1流控异常:tag={},finishLog={},e={}",tag,JSON.toJSONString(finishLog),e.getMessage());
+                    log.error("推送完课打备注失败1流控异常:finishLog={},e={}",JSON.toJSONString(finishLog),e.getMessage());
                 } else {
                     // 其他异常
-//                    log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());
-                    log.error("推送完课打备注失败: tag={}, finishLog={}, error={}",
-                            tag, JSON.toJSONString(finishLog), e.getMessage());
+                    log.error("推送完课打备注失败1:{},{}",JSON.toJSONString(finishLog),e.getMessage());
                 }
             }
         });