开发者

Java连接Emqx实现订阅发布消息的步骤记录

目录
  • 一:前提
  • 二:订阅发布实现步骤
    • 1.引入依赖
    • 2.编辑配置文件
    • 3.读取配置文件
    • 4.创建Mqtt客户端
    • 5.controller层
    • 6.service层
    • 7.dao层
  • 三:测试
    • 1.PostMan直接调用测试
    • 2、下载MQTTX客户端进行测试
  • 总结 

    一:前提

    安装了Emqx开源版、MQTTX客户端

    二:订阅发布实现步骤

    1.引入依赖

    <!--MQTT客户端-->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.2</version>
    </dependency>

    2.编辑配置文件

    mqtt:
      broker:
        uri: tcp://127.0.0.1:31883
      client:
        id: mqtt-am-client-${random.uuid}
      # 订阅主题配置(支持多个)
      inTopics:
      ucPWZeq  - topic: test/topic1
          qos: 0
        - topic: test/topic2
          qos: 1
        - topic: test/topic3
          qos: 2
      # 发布主题配置(支持多个)
      outTopics:
        - topic: out/topic1
          qos:js 0
      username: am
      password: LGyPtuAB4th5p
      keepAliveInterval: 60

    3.读取配置文件

    package com.wtzn.web.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    import Java.util.List;
    
    @Conwww.devze.comfiguration
    @ConfigurationProperties(prefix = "mqtt")
    @Data
    public class MqttProperties {
        private Broker broker;
        private Client client;
        private List<TopicConfig> inTopics;
        private List<TopicConfig> outTopics;
        private String userName;
        private String password;
        private int KeepAliveInterval;
    
        @Data
        public static class Broker {
            private String uri;
        }
    
        @Data
        public static class Client {
            private String id;
        }
        @Data
        public static class TopicConfig {
            private String topic;
            private int qos;
        }
    
    }

    4.创建Mqtt客户端

    package com.wtzn.web.config;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MqttConfig {
    
        @Autowired
        private MqttProperties mqttProperties;
    
        @Bean
        public MqttClient mqttClient() throws MqttException {
            MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            // 此客户端的用户名和密码
            options.setUserName(mqttProperties.getUserName());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setCleanSession(true);
            // 设置遗嘱消息
          //  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这http://www.devze.com是我的遗嘱".getBytes(), 2, true);
            // 连接超时重试
            options.setConnectionTimeout(5000); //毫秒
            options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
            options.setAutomaticReconnect(true);//网络中断重连
            client.connect(options);
            retuandroidrn client;
        }
    }

    5.controller层

    package com.wtzn.web.controller;
    
    import cn.dev33.satoken.annotation.SaIgnore;
    import com.wtzn.common.json.utils.JsonUtils;
    import com.wtzn.web.domain.bo.Payload;
    import com.wtzn.web.service.MqttService;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.util.LinkedList;
    
    
    @RestController
    @Slf4j
    @RequestMapping("/mqtt")
    public class MqttController {
    
        @Autowired
        private MqttService mqttService;
    
        @SaIgnore
        @PostMapping("/mqtt")
        public void publish() {
            try {
              //  LinkedList<Payload> payloadLinkedList=new LinkedList<>();
                for(int i=1; i<=10000; i++){
                    Payload payload=new Payload();
                    payload.setTemperature(i);
                  //  payloadLinkedList.add(payload);
                    mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
                }
    
            } catch (MqttException e) {
                log.error("发布消息失败{}", e.getMessage());
            }
            log.info("发布消息成功");
        }
    
    
    }

    6.service层

    package com.wtzn.web.service;
    
    import com.wtzn.common.json.utils.JsonUtils;
    import com.wtzn.web.config.MqttProperties;
    import com.wtzn.web.domain.bo.Payload;
    import jakarta.annotation.PostConstruct;
    import jakarta.annotation.PreDestroy;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.Arrays;
    
    
    @Service
    @Slf4j
    public class MqttService implements MqttCallbackExtended {
    
        @Autowired
        private MqttClient mqttClient;
    
        @Autowired
        private MqttProperties mqttProperties;
        
        @PostConstruct
        public void init() throws MqttException {
            mqttClient.setCallback(this);
     /*       mqttClient.subscribe(mqttProperties.getInTopic());
            log.info("订阅主题{}", mqttProperties.getInTopic());
    */
            mqttProperties.getInTopics().forEach(x -> {
                try {
                    mqttClient.subscribe(x.getTopic(), x.getQos());
                    log.info("订阅主题{}", x.getTopic());
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
            });
    
        }
    
        @PreDestroy
        public void destroy() throws MqttException {
            mqttClient.disconnect();
            log.info("与服务器断开连接");
        }
    
        /**
         * @description: 发送消息
         * @param: [message]
         * @return: void
         **/
        public void publish(String topic,int qos,String message) throws MqttException {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(qos);
            mqttClient.publish(topic, mqttMessage);
            log.info("向主题【{}】发布消息:【{}】", topic, message);
        }
    
    
        /**
         * @description: 接收消息
         * @param: [topic, message]
         * @return: void
         **/
        @Override
        public void messageArrived(String topic, MqttMessage message) throws MqttException {
            Payload payload = JsonUtils.parseobject(new String(message.getPayload()), Payload.class);
            log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());
          /*  if (payload.getTemperature() > 37) {
                publish("发烧");
            }*/
    
    
        }
    
    
        @Override
        public void connectionLost(Throwable cause) {
            log.error("连接丢失:{}", cause.getMessage());
        }
    
        @SneakyThrows
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            if( token!=null ){
                MqttMessage message = null;
                try {
                    message = token.getMessage();
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
                String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
                String str = message==null ? null : new String(message.getPayload());
                log.info("deliveryComplete: topic={}, message={}", topic, str);
            } else {
                log.info("deliveryComplete: null");
            }
    
            log.info("消息已送达");
        }
    
        @Override
        public void connectComplete(boolean b, String s) {
    
                mqttProperties.getInTopics().forEach(x -> {
                    try {
                        mqttClient.subscribe(x.getTopic(), x.getQos());
                        log.info("订阅主题{}", x.getTopic());
                    } catch (MqttException e) {
                        throw new RuntimeException(e);
                    }
                });
        }
    }

    7.dao层

    package com.wtzn.web.domain.bo;
    
    import lombok.Data;
    
    @Data
    public class Payload {
        private Integer temperature;
    }

    三:测试

    1.PostMan直接调用测试

    Java连接Emqx实现订阅发布消息的步骤记录

    Java连接Emqx实现订阅发布消息的步骤记录

    Java连接Emqx实现订阅发布消息的步骤记录

    2、下载MQTTX客户端进行测试

    Java连接Emqx实现订阅发布消息的步骤记录

    Java连接Emqx实现订阅发布消息的步骤记录

    Java连接Emqx实现订阅发布消息的步骤记录

    总结 

    到此这篇关于Java连接Emqx实现订阅发布消息的文章就介绍到这了,更多相关Java Emqx订阅发布消息内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜