开发者

MQTT SpringBoot整合实战教程

目录
  • MQTT-SpringBoot
    • 创建简单 SpringBoot 项目
    • 导入必须依赖
    • 增加MQTT相关配置
    • 编写对应Java类
      • 配置类
      • 启动类增加开启配置
      • 创建MQTT连接工厂类
      • 接收消息处理类
      • 接收消息配置类
      • 发送消息配置类
      • 发送消息网关接口类
      • 发送消息服务类
  • 测试验证
    • 订阅消息验证
      • 发送消息
        • 订阅收到消息
          • 发送消息验证
            • 创建订阅者
              • 订阅者接收消息

              MQTT-SpringBoot

              创建简单 SpringBoot 项目

              导入必须依赖

              pom.XML

              <?xml version="1.0" encoding="UTF-8"?>
              <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
                  <modelVersion>4.0.0</modelVersion>
                  <groupId>com.study</groupId>
                  <artifactId>MqttDemo</artifactId>
                  <version>0.0.1-SNAPSHOT</version>
                  <name>SpringBootMqttDemo</name>
                  <description>SpringBootMqttDemo</description>
                  <properties>
                      <java.version>1.8</java.version>
                      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
                      <spring-boot.version>2.6.13</spring-boot.version>
                  </properties>
                  <dependencies>
                      <dependency>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter</artifactId>
                      </dependency>
                      <dependency>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-test</artifactId>
                          <scope>test</scope>
                      </dependency>
                      <!-- spring boot项目web开发的起步依赖 -->
                      <dependency>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-web</artifactId>
                      </dependency>
                      <!-- spring boot项目集成消息中间件基础依赖 -->
                      <dependency>
                          <groupId>org.springframework.boot</groupId>
                          <artifactId>spring-boot-starter-integration</artifactId>
                      </dependency>
                      <!-- spring boot项目和mqtt客户端集成起步依赖 -->
                      <dependency>
                          <groupId>org.springframework.integration</groupId>
                          <artifactId>spring-integration-mqtt</artifactId>
                          <version>5.4.3</version>
                      </dependency>
                      <!-- lombok依赖 -->
                      <dependency>
                          <groupId>org.projectlombok</groupId>
                          <artifactId>lombok</artifactId>
                      </dependency>
                      <!-- fastjson依赖 -->
                      <dependency>
                          <groupId>com.alibaba</groupId>
                          <artifactId>fastjson</artifactId>
                          <version>1.2.83</version>
                      </dependency>
                python  </dependencies>
                  <dependencyManagement>
                      <dependencies>
                          <dependency>
                              <groupId>org.springframework.boot</groupId>
                              <artifactId>spring-boot-dependencies</artifactId>
                              <version>${spring-boot.version}</version>
                              <type>pom</type>
                              <scope>import</scope>
                          </dependency>
                      </dependencies>
                  </dependencyManagement>
                  <build>
                      <plugins>
                          <plugin>
                              <groupId>org.apache.maven.plugins</groupId>
                              <artifactId>maven-compiler-plugin</artifactId>
                              <version>3.8.1</version>
                              <configuration>
                                  <source>1.8</source>
                                  <target>1.8</target>
                                  <encoding>UTF-8</encoding>
                              </configuration>
                          </plugin>
                          <plugin>
                              <groupId>org.springframework.boot</groupId>
                              <artifactId>spring-boot-maven-plugin</artifactId>
                              <version>${spring-boot.version}</version>
                              <configuration>
                                  <mainClass&gjst;com.study.mqtt.demo.MqttDemoApplication</mainClass>
                                  <skip>true</skip>
                              </configuration>
                              <executions>
                                  <execution>
                                      <id>repackage</id>
                                      <goals>
                                          <goal>repackage</goal>
                                      </goals>
                                  </execution>
                              </executions>
                          </plugin>
                      </plugins>
                  </build>
              </project>

              增加MQTT相关配置

              application.yml

              spring:
                mqtt:
                  # mqtt 服务器地址
                  url: tcp://192.168.40.128:1883
                  # 订阅客户端ID
                  subClientId: sub_client_id_1
                  # 订阅主题
                  subTopic: lq/iot/demo/
                  # 发布客户端ID
                  pubClientId: pub_client_id_1
                  # 用户名
                  username: admin
                  # 密码
                  password: admin123456

              编写对应Java类

              配置类

              MqttConfig.java

              package com.study.mqtt.demo.domain;
              import lombok.Data;
              import org.springframework.boot.context.properties.ConfigurationProperties;
              @Data
              @ConfigurationProperties(prefix = "spring.mqtt")
              public class MqttConfig {
                  private String username;
                  private String password;
                  private String url;
                  private String subClientId ;
                  private String subTopic ;
                  private String pubClientId ;
              }

              启动类增加开启配置

              MqttDemoApplication.java

              package com.study.mqtt.demo;
              import com.study.mqtt.demo.domain.MqttConfig;
              import org.springframework.boot.SpringApplication;
              import org.springframework.boot.autoconfigure.SpringBootApplication;
              import org.springframework.boot.context.properties.EnableConfigurationProperties;
              @SpringBootApplication
              @EnableConfigurationProperties(value = MqttConfig.class)
              public class MqttDemoApplication {
                  public static void main(String[] args) {
                      SpringApplication.run(MqttDemoApplication.class, args);
                  }
              }

              创建MQTT连接工厂类

              MqttFactory.java

              package com.study.mqtt.demo.factory;
              import com.study.mqtt.demo.domain.MqttConfig;
              import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
              import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
              @Configuration
              public class MqttFactory {
                  @Autowired
                  private MqttConfig mqttConfig;
                  @Bean
                  public MqttPahoClientFactory mqttClientFactory() {
                      // 创建客户端工厂
                      DefaultMqttPahoClientFactory  factory = new DefaultMqttPahoClientFactory();
                      MqttConnectOptions options = new MqttConnectOptions();
                      options.setUserName(mqttConfig.getUsername());
                      options.setPassword(mqttConfig.getPassword().toCharArray());
                      options.setServerURIs(new String[]{mqttConfig.getUrl()});
                      options.setCleanSession(true);
                      factory.setConnectionOptions(options);
                      return factory;
                  }
              }

              接收消息处理类

              ReceiveMsgHandler.java

              package com.study.mqtt.demo.handler;
              import org.springframework.messaging.Message;
              import org.springframework.messaging.MessageHandler;
              import org.springframework.messaging.MessageHeaders;
              import org.springframework.messaging.MessagingException;
              import org.springframework.stereotype.Component;
              @Component
              public class ReceiveMsgHandler implements MessageHandler {
                  @Override
                  public void handleMessage(Message<?> message) throws MessagingException {
                      System.out.println("接收到消息对象:" + message);
                      // 消息内容
                      Object payload = message.getPayload();
                      MessageHeaders headers = message.getHeaders();
                      Object mqttReceivedTopic = headers.get("mqtt_receivedTopic");
                      System.out.println("接收的消息主题:" + mqttReceivedTopic);
                      System.out.println("接收的消息内容:" + payload);
                  }
              }

              接收消息配置类

              MqttInboundConfig.java

              package com.study.mqtt.demo.inbound;
              import com.study.mqtt.demo.domain.MqttConfig;
              import com.study.mqtt.demo.factory.MqttFactory;
              import com.study.mqtt.demo.handler.ReceiveMsgHandler;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              import org.springframework.integration.annotation.ServiceActivator;
              import org.springframework.integration.channel.DirectChannel;
              import org.springframework.integration.core.MessageProducer;
              import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
              import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
              import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
              import org.springframework.messaging.MessageChannel;
              import org.springframework.messaging.MessageHandler;
              @Configuration
              public class MqttInboundConfig {
                  @Autowired
                  private编程客栈 MqttConfig mqttConfig ;
                  @Autowired
                  private ReceiveMsgHandler receiveMsgHandler;
                  /**
                   * 配置消息接收通道
                   * @return
                   */
                  @Bean
                  public MessageChannel mqttInputChannel() {
                      return new DirectChannel();
                  }
                  /**
                   * 配置接收适配器
                   */
                  @Bean
                  public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {
                      MqttPahoMessageDrivenChannelAdapter adapter  =
                              new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() ,
                                      mqttConfig.getSubClientId() ,
                                      mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ;
                      adapter.setConverter(new DefaultPahoMessageConverter());
                      // 质量服务等级
                      adapter.setQos(1);
                      adapter.setOutputChannel(mqttInputChannel());
                      return adapter ;
                  }
                  /**
                   * 配置接收消息处理器
                   * @return
                   */
                  @Bean
                  @ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道
                  public MessageHandler messageHandler() {
                      return this.receiveMsgHandler ;
                  }
              }

              发送消息配置类

              MqttOutbouwww.devze.comndConfig.java

              package com.study.mqtt.demo.outbound;
              import com.study.mqtt.demo.domain.MqttConfig;
              import org.springframework.beans.factory.annowww.devze.comtation.Autowired;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              import org.springframework.integration.annotation.ServiceActivator;
              import org.springframework.integration.channel.DirectChannel;
              import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
              import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
              import org.springframework.messaging.MessageChannel;
              import org.springframework.messaging.MessageHandler;
              @Configuration
              public class MqttOutboundConfig {
                  @Autowired
                  private MqttConfig mqttConfig;
                  @Autowired
                  private MqttPahoClientFactory pahoClientFactory ;
                  @Bean
                  public MessageChannel mqttOutputChannel() {
                      return new DirectChannel();
                  }
                  @Bean
                  @ServiceActivator(inputChannel = "mqttOutputChannel")
                  public MessageHandler mqttOutboundMassageHandler() {
                      MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() ,
                              mqttConfig.getPubClientId() , pahoClientFactory ) ;
                      messageHandler.setAsync(true);
                      messageHandler.setDefaultQos(0);
                      messageHandler.setDefaultTopic("default");
                      return messageHandler ;
                  }
              }

              发送消息网关接口类

              MqttGateway.java

              package com.study.mqtt.demo.gateway;
              import org.springframework.integration.annotation.MessagingGateway;
              import org.springframework.integration.mqtt.support.MqttHeaders;
              import org.springframework.messaging.handler.annotation.Header;
              import org.springframework.stereotype.Component;
              @MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
              public interface MqttGateway {
                  /**
                   * 发送mqtt消息
                   * @param topic 主题
                   * @param payload 内容
                   */
                  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
                  /**
                   * 发送包含qos的消息
                   * @param topic 主题
                   * @param qos 对消息处理的几种机制。
                   *          * 0 发送成功就算完成,会出现消息丢失
                   *          * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息
                   *          * 2 多了一次去重的动作,确保只有一次消息推给订阅者。
                   * @param payload 消息体
                   */
                  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
              }

              发送消息服务类

              MqttMsgSenderService.java

              package com.study.mqtt.demo.service;
              import com.study.mqtt.demo.gateway.MqttGateway;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              @Service
              public class MqttMsgSenderService {
                  @Autowired
                  private MqttGateway mqttGateway;
                  public void send(String topic, String payload) {
                      mqttGateway.sendToMqtt(topic, payload);
                  }
                  public void send(String topic, int qos, String payload) {
                      mqttGateway.sendToMqtt(topic, qos, payload);
                  }
              }

              测试验证

              订阅消息验证

              启动项目

              MQTT SpringBoot整合实战教程

              发送消息

              • 主题为配置文件中配置的订阅主题 lq/iot/demo/
              • 发送时间: 2025-05-25 21:29:26:439

              MQTT SpringBoot整合实战教程

              订阅收到消息

              • 接收到消息的时间:Sun May 25 21:29:26 GMT+08:00 2025
              • ​​​​​​​接收到的主题:lq/iot/demo/
              • ​​​​​​​接收到的内容:{ "msg":"spring boot mqtt demo" }

              MQTT SpringBoot整合实战教程

              发送消息验证

              • 编写测试类
                • 发送主题:sb/mqtt/test
                • ​​​​​​​发送内容:hello world !=> 当前时间
              package com.study.mqtt.demo;
              import com.study.mqtt.demo.service.MqttMsgSenderService;
              import org.junit.jupiter.api.Test;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.boot.test.context.SpringBootTest;
              import java.util.Date;
              @SpringBootTest(classes = MqttDemoApplication.class)
              class MqttDemoApplicationTests {
                  @Autowired
                  private MqttMsgSenderService mqttMsgSenderService;
                  @Test
                  void contextLoads() {
                  }
                  @Test
                  void sendMsg(){
                      mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date());
                  }
              }

              创建订阅者

              订阅主题: sb/mqtt/test

              MQTT SpringBoot整合实战教程

              运行测试类

              MQTT SpringBoot整合实战教程

              订阅者接收消息

              主题:sb/mqtt/test

              MQTT SpringBoot整合实战教程

              到此这篇关于MQTT SpringBoot整合的文章就介绍到这了,更多相关MQTT SpringBoot整合内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

              0

              上一篇:

              下一篇:

              精彩评论

              暂无评论...
              验证码 换一张
              取 消

              最新开发

              开发排行榜