瀏覽代碼

feat: 集成MQTT

xdd 1 月之前
父節點
當前提交
32ae4bb25f

+ 10 - 0
fs-admin/pom.xml

@@ -122,6 +122,16 @@
             <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <!-- MQTT -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 96 - 0
fs-admin/src/main/java/com/fs/framework/config/MqttConfig.java

@@ -0,0 +1,96 @@
+package com.fs.framework.config;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+@Configuration
+public class MqttConfig {
+
+    @Value("${mqtt.host}")
+    private String host;
+
+    @Value("${mqtt.username}")
+    private String username;
+
+    @Value("${mqtt.password}")
+    private String password;
+
+    @Value("${mqtt.clientId}")
+    private String clientId;
+
+    @Value("${mqtt.defaultTopic}")
+    private String defaultTopic;
+
+    @Value("${mqtt.timeout}")
+    private int timeout;
+
+    @Value("${mqtt.keepalive}")
+    private int keepalive;
+
+    @Bean
+    public MqttPahoClientFactory mqttClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{host});
+        options.setUserName(username);
+        options.setPassword(password.toCharArray());
+        options.setConnectionTimeout(timeout);
+        options.setKeepAliveInterval(keepalive);
+        options.setCleanSession(true);
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+
+    // ==================== 订阅(入站)====================
+
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
+        MqttPahoMessageDrivenChannelAdapter adapter =
+                new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory(), defaultTopic);
+        adapter.setCompletionTimeout(5000);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(1);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "mqttInputChannel")
+    public MessageHandler mqttMessageHandler() {
+        return new MqttMessageHandler();
+    }
+
+    // ==================== 发布(出站)====================
+
+    @Bean
+    public MessageChannel mqttOutputChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutputChannel")
+    public MessageHandler mqttOutbound() {
+        MqttPahoMessageHandler handler =
+                new MqttPahoMessageHandler(clientId + "-outbound", mqttClientFactory());
+        handler.setAsync(true);
+        handler.setDefaultTopic(defaultTopic);
+        handler.setDefaultQos(1);
+        return handler;
+    }
+}

+ 26 - 0
fs-admin/src/main/java/com/fs/framework/config/MqttGateway.java

@@ -0,0 +1,26 @@
+package com.fs.framework.config;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+
+@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
+public interface MqttGateway {
+
+    /**
+     * 发送消息到默认 topic
+     */
+    void sendToDefaultTopic(String payload);
+
+    /**
+     * 发送消息到指定 topic
+     */
+    void sendToTopic(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+    /**
+     * 发送消息到指定 topic,并指定 QoS
+     */
+    void sendToTopic(@Header(MqttHeaders.TOPIC) String topic,
+                     @Header(MqttHeaders.QOS) int qos,
+                     String payload);
+}

+ 20 - 0
fs-admin/src/main/java/com/fs/framework/config/MqttMessageHandler.java

@@ -0,0 +1,20 @@
+package com.fs.framework.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+
+public class MqttMessageHandler implements MessageHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
+
+    @Override
+    public void handleMessage(Message<?> message) throws MessagingException {
+        String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+        String payload = message.getPayload().toString();
+        log.info("收到MQTT消息 - topic: {}, payload: {}", topic, payload);
+        // 在此处理订阅到的消息
+    }
+}

+ 9 - 0
fs-admin/src/main/resources/application-dev.yml

@@ -79,3 +79,12 @@ spring:
                     config:
                         multi-statement-allow: true
 
+# MQTT配置
+mqtt:
+    host: tcp://localhost:1883
+    username: admin
+    password: password
+    clientId: fs-admin-client
+    defaultTopic: test/topic
+    timeout: 30
+    keepalive: 60