开发者

SpringBoot集成MQTT实现交互服务通信

目录
  • 引言
  • 一、什么是MQTT
  • 二、发布/订阅模式
  • 三、Windows下安装MQTT消息服务器
  • 四、Windows安装MQTT消息代理客户端MQTTX
  • 五、新建MQTT集成项目
    • 5.1 yml配置
    • 5.2 Mqtt配置类
    • 5.3 MQTT客户端
    • 5.4 MQTT客户端管理类
    • 5.5 MQTT客户端创建
    • 5.6 MQTT回调抽象类
    • 5.7 MQTT订阅回调环境类
    • 5.8 默认回调类
  • 六、测试服务类
    • 七、启动springboot

      引言

      本文是springboot集成mqtt的一个实战案例。

      gitee代码库地址:源码地址

      一、什么是MQTT

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

      网络受限:网络带宽较低且传输不可靠

      终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

      MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。

      二、发布/订阅模式

      发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。

      主要组成部分

      • 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。

      • 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。

      • 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。

      优点

      • 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。

      • 灵活性:可以动态添加或删除订阅者,不影响其他组件。

      • 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。

      缺点

      • 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。

      • 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。

      应用场景

      • 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。

      • 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。

      • 分布式系统:用于跨系统或跨服务的消息传递。

      发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。

      三、Windows下安装MQTT消息服务器

      非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本编程客栈的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。

      • 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
      • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public

      如果报错缺少Erlang环境,需要自行安装下该环境

      SpringBoot集成MQTT实现交互服务通信

      SpringBoot集成MQTT实现交互服务通信

      浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过

      SpringBoot集成MQTT实现交互服务通信

      四、Windows安装MQTT消息代理客户端MQTTX

      下载地址:MQTTX下载地址

      点击免费下载

      SpringBoot集成MQTT实现交互服务通信

      选择64位版本

      SpringBoot集成MQTT实现交互服务通信

      下好后点击安装,启动运行界面如下:

      SpringBoot集成MQTT实现交互服务通信

      语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。

      五、新建MQTT集成项目

      随便新建了一个springboot应用,用的是编程客栈JDK17,在pom文件中引入如下依赖:

              <!-- MQTT -->
              <dependency>
                  <groupId>org.eclipse.paho</groupId>
                  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                  <version>1.2.5</version>
              </dependency>
      

      5.1 yml配置

      server:
        port: 8081
      
      #允许循环依赖
      spring:
        main:
          allow-circular-references: true
      
      customer:
        mqtt:
          broker: tcp://localhost:1883
          clientList:
            #发布客户端ID
            - clientId: nays_service
              #监听主题 同时订阅多个主题 使用 - 分割开
              subscribeTopic: mqtt/publish
              #用户名
              userName: admin
              #密码
              password: public
            #接收客户端ID
            - clientId: receive_service
              #监听主题 同时订阅多个主题 使用 - 分割开
              subscribeTopic: mqtt/receive
              #用户名
              userName: admin
              #密码
              password: public
      
      
      
      

      5.2 Mqtt配置类

      package com.hulei.mqttproject.config;
      
      import lombok.Data;
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.context.annotation.Configuration;
      
      import Java.util.List;
      
      /**
       * Mqtt配置类
       */
      @Data
      @Configuration
      @ConfigurationProperties(prefix = "customer.mqtt")
      public class MqttConfig {
          /**
           * mqtt broker地址
           */
          String broker;
          /**
           * 需要创建的MQTT客户端
           */
          List<MqttClient> clientList;
      }
      
      

      5.3 MQTT客户端

      package com.hulei.mqttproject.config;
      
      import lombok.Data;
      
      
      /**
       * MQTT客户端
       */
      @Data
      public class MqttClient {
          /**
           * 客户端ID
           */
          private String clientId;
          /**
           * 监听主题
           */
          private String subscribeTopic;
          /**
           * 用户名
           */
          private String userName;
          /**
           * 密码
           */
          private String password;
      }
      
      

      5.4 MQTT客户端管理类

      package com.hulei.mqttproject.config;
      
      import jakarta.annotation.Resource;
      import lombok.extern.slf4j.Slf4j;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.springframewpythonork.beans.factory.annotation.Value;
      import org.springframework.stereotype.Component;
      
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      
      /**
       * MQTT客户端管理类,如果客户端非常多后续可入Redis缓存
       */
      @Slf4j
      @Component
      public class MqttClientManager {
      
          @Value("${customer.mqtt.broker}")
          private String mqttBroker;
      
          @Resource
          private MqttCallBackContext mqttCallBackContext;
          /**
           * 存储MQTT客户端
           */
          public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
      
          public MqttClient getMqttClientById(String clientId) {
              return MQTT_CLIENT_MAP.get(clientId);
          }
      
          /**
           * 创建mqtt客户端
           *
           * @param clientId       客户端ID
           * @param subscribeTopic 订阅主题,可为空
           * @param userName       用户名,可为空
           * @param password       密码,可为空
           */
          public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
              MemoryPersistence persistence = new MemoryPersistence();
              try {
                  MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
                  MqttConnectOptions connOpts = new MqttConnectOptions();
                  if (null != userName && !userName.isEmpty()) {
                      connOpts.setUserName(userName);
                  }
      
                  if (null != password && !password.isEmpty()) {
                      connOpts.setPassword(password.toCharArray());
                  }
      
                  connOpts.setCleanSession(true);
      
                  if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                      AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
      
                      if (null == callBack) {
                          callBack = mqttCallBackContext.getCallBack("default");
                      }
      
                      callBack.setClientId(clientId);
                      callBack.setConnectOptions(connOpts);
                      client.setCallback(callBack);
                  }
      
                  //连接mqtt服务端broker
                  client.connect(connOpts);
                  // 订阅主题
                  if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                      if (subscribeTopic.contains("-"))
                          client.subscribe(subscribeTopic.split("-"));
                      else {
                          client.subscribe(subscribeTopic);
                      }
                  }
      
                  MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
              } catch (MqttException e) {
                  log.error("Create mqttClient failed!", e);
              }
          }
      }
      
      

      5.5 MQTT客户端创建

      package com.hulei.mqttproject.config;
      
      import jakarta.annotation.PostConstruct;
      import jakarta.annotation.Resource;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      
      import java.util.List;
      
      /**
       * MQTT客户端创建
       */
      @Component
      @Slf4j
      public class MqttClientCreate {
          @Resource
          private MqttClientManager mqttClientManager;
          @Resource
          private MqttConfig mqttConfig;
      
          /**
           * 创建MQTT客户端
           */
          @PostConstruct
          public void createMqttClient() {
              List<MqttClient> mqttClientList = mqttConfig.getClientList();
      
              for (MqttClient mqttClient : mqttClientList) {
                  log.info("{}", mqttClient);
                  //创建客户端,客户端ID:demo,回调类跟客户端ID一致
                  mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
              }
          }
      }
      
      

      5.6 MQTT回调抽象类

      package com.hulei.mqttproject.config;
      
      import jakarta.annotation.Resource;
      import lombok.Getter;
      import lombok.Setter;
      import lombok.extern.slf4j.Slf4j;
      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      
      /**
       * MQTT回调抽象类
       */
      @Setter
      @Getter
      @Slf4j
      public abstract class AbsMqttCallBack implements MqttCallback {
      
          private String clientId;
      
          private MqttConnectOptions connectOptions;
      
          @Resource
          Mq编程客栈ttClientManager mqttClientManager;
      
          /**
           * 失去连接操作,进行重连
           *
           * @param throwable 异常
           */
          @Override
          public void connectionLost(Throwable throwable) {
              try {
                  if (null != clientId) {
                编程客栈      if (null != connectOptions) {
                          mqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                      } else {
                          mqttClientManager.getMqttClientById(clientId).connect();
                      }
                  }
      
              } catch (Exception e) {
                  log.error("{} reconnect failed!", e.getMessage(), e);
              }
          }
      
          /**
           * 接收订阅消息
           * @param topic    主题
           * @param mqttMessage 接收消息
           * @throws Exception 异常
           */
          @Override
          public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
      		String content = new String(mqttMessage.getPayload());
           	handleReceiveMessage(topic, content);
          }
      
          /**
           * 消息发送成功
           *
           * @param iMqttDeliveryToken toke
           */
          @Override
          public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
              log.info("消息发送成功");
          }
      
      
          /**
           * 处理接收的消息
           * @param topic   主题
           * @param message 消息内容
           */
          protected abstract void handleReceiveMessage(String topic, String message);
      }
      
      
      

      5.7 MQTT订阅回调环境类

      package com.hulei.mqttproject.config;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      
      /**
       * MQTT订阅回调环境类
       */
      @Component
      @Slf4j
      public class MqttCallBackContext {
      
          private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
      
          /**
           * 默认构造函数
           *
           * @param callBackMap 回调集合
           */
          public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
              this.callBackMap.putAll(callBackMap);
          }
      
          /**
           * 获取MQTT回调类
           *
           * @param clientId 客户端ID
           * @return MQTT回调类
           */
          public AbsMqttCallBack getCallBack(String clientId) {
              return this.callBackMap.get(clientId);
          }
      }
      
      

      5.8 默认回调类

      package com.hulei.mqttproject.config;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      
      /**
       * 默认回调
       */
      @Slf4j
      @Component("default")
      public class DefaultMqttCallBack extends AbsMqttCallBack {
      
          /**
           * @param topic   主题
           * @param message 消息内容
           */
          @Override
          protected void handleReceiveMessage(String topic, String message) {
              log.info("接收到主题---{}", topic);
              log.info("接收到消息---{}", message);
              // 自定义消息处理业务
      
          }
      }
      
      

      六、测试服务类

      package com.hulei.mqttproject.controller;
      
      import com.hulei.mqttproject.config.MqttClientManager;
      import jakarta.annotation.Resource;
      import lombok.extern.slf4j.Slf4j;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      @RestController
      @Slf4j
      public class SendController {
      
          @Resource
          private MqttClientManager mqttClientManager;
      
          @RequestMapping("/sendMessage")
          public String sendMessage(String topic){
              try {
                  MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
                  mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);
                  return "发送成功";
              } catch (Exception e) {
                  log.error("发送失败",e);
                  return "发送失败";
              }
          }
      }
      
      

      七、启动springboot

      启动日志可以看到,mqtt消息服务器连接成功

      SpringBoot集成MQTT实现交互服务通信

      EMQX工具显示发布客户端和接收客户端均已成功注册

      SpringBoot集成MQTT实现交互服务通信

      使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。

      SpringBoot集成MQTT实现交互服务通信

      SpringBoot集成MQTT实现交互服务通信

      到此这篇关于SpringBoot集成MQTT实现交互服务通信的文章就介绍到这了,更多相关SpringBoot MQTT交互服务通信内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)! 

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜