开发者

SpringBoot整合MQTT协议实现消息订阅与发布功能

目录
  • 1、相关依赖 pom.XML文件
  • 2、配置文件 application.yml
  • 3、MQTT配置类
  • 4、发布连接类
  • 5、订阅类
  • 6、回调类
  • 7、启动后,进入EMQX管理页面
  • 8、通过接口给主题发送消息

1、相关依赖 pom.xml文件

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2、配置文件 application.yml

这里的订阅主题可不要,我这里用于启动的时候就订阅固定主题。适用主题固定的场景。

# MQTT服务地址,端口默认1883
mqtt-broker-url: tcp://127.0.0.1:1883
# 用户名
mqtt-username: admin
# 密码
mqtt-password: public
# 订阅主题(可以多个)
mqtt-default-topic: mqtt/topic_test
# 客户端Id
mqtt-clientId: can

3、MQTT配置类

用于配置项目启动时就连接MQTT。

@Component
public class MqttConfig {
    @Resource
    private MqttPushClient mqttPushClient;
    @Resource
    private MqttSubClient mqttSubClient;
    /**
     * 用户名
     */
    @Value("${mqtt-username}")
    private String username;
    /**
     * 密码
     */
    @Value("${mqtt-password}")
    private String password;
    /**
     * 连接地址
     */
    @Value("${mqtt-broker-url}")
    private String hostUrl;
    /**
     * 客户Id
     */
    @Value("${mqtt-clientId}")
    private String clientId;
    /**
     * 默认连接话题,多个的话用逗号隔开
     */
    @Value("${mqtt-default-topic}")
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout = 100;
    /**
     * 保持连接数
     */
    private int keepalive = 60;
    /**
     * 连接至mqtt服务器,获取mqtt连接
     *
     * @return MqttPushClient
     */
    @Bean
    public MqttPushClient getMqttPushClient() {
        // 连接至mqtt服务器,获取mqtt连接
        mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
        // 订阅默认主题
        mqttSubClient.subScribeDataPublishTopic(defaultTopic);
        return mqttPushClient;
    }
}

4、发布连接类

连接MQTT的方法、发布消息的方法。

@Slf4j
@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
    @Autowired
    private PushCallback pushCallback;
    @Getter
    private static MqttClient client;
    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }
    /**
     * 连接
     * @param host  mqtt://127.0.0.1:1883
     * @param clientId  can
     * @param username admin
     * @param password  password
     * @param timeout   100
     * @param keepalive 60
     */
    public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            // automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅
            options.setAutomaticReconnect(true);
            MqttPushClient.setClient(client);
            try {
                //设置回调类
                client.setCallback(pushCallback);
                IMqttToken iMqttToken = client.connectWithResult(options);
                boolean complete = iMqttToken.isComplete();
                log.error("MQTT连接{}", complete ? "成功" : "失败");
            } catch (Exception e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
            e.printhttp://www.devze.comStackTrace();
        }
    }
    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        client.disconnect();
        client.close();
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic       主题名
     * @param pushMessage 消息
     */
    public void publish(String topic, String pushMessage) {
        publish(0, false, topic, pushMessage);
    }
    /**
     * 发布
     * QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
     * QoS 1:消息至少传送一次。
     * QoS 2:消息只传送一次。
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        // MQTT主题不存在
        if (null == mTopic) return;
        try {
            mTopic.publish(message);
        } catch (Exception e) {
            log.error("MQTT发送消息异常:", e);
            e.printStackTrace();
        }
    }
}

5、订阅类

用于订阅某个或多个主题、取消订阅某个或者多个主题。

@Slf4j
@Component
public class MqttSubClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttSubClient.class);
    // 订阅多个主题以逗号分开
    public void subScribeDataPublishTopic(String defaultTopic) {
      php  //订阅test_queue主题
        String[] mqttTopic = defaultTopic.split(",");
        for (String s : mqttTopic) {
            //订阅主题
            subscribe(s, 0);
        }
    }
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic 主题
     */
    public void subscribe(String topic) {
        subscribe(topic, 0);
    }
    /**
     * 订阅某个主题
     *
     * @param topic 主题名
     * @param qos   qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MqttClient client = MqttPushClient.getClient();
            if (client == null) {
                return;
            }
            client.subscribe(topic, qos);
            log.error("MQTT订阅主题:{}", topic);
        } catch (MqttException e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 取消订阅某个主题
     * @param topic 要取消订阅的主题名
     */
    public void unsubscribe(String topic) {
        try {
            MqttClient client = MqttPushClient.getClient();
            if (client == null || !client.isConnected()) {
                return;
            }
            client.unsubscribe(topic); // 取消订阅
            log.error("MQTT取消订阅主题: {}", topic);
        } catch (MqttException e) {
            log.error("取消订阅失败: {}", e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 批量取消订阅多个主题
     * @param topics 主题数组
     */
    public void unsubscribe(String[] topics) {
        try {
            MqttClient client = MqttPushClient.getClient();
            if (client == null || !client.isConnected()) {
                return;
            }
            client.unsubscribe(topics); // 取消订阅多个主题
            log.error("MQTT取消订阅主题: {}", Arrays.toString(topics));
        } catch (MqttException e) {
            log.error("取消订阅失败: {}", e.getMessage());
            e.printStackTrace();
        }
    }
}
 

6、回调类

处理MQTT连接断开重连、订阅主题接收的消息处理。

@Slf4j
@Component
public class PushCallback implements MqttCallback {
    @Resource
    @Lazy
    private MqttPushClient mqttPushClient;
    /**
     * 连接丢失后,一般在这里面进行重连(重连的逻辑需要自己处理)
     * @param cause .
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.error("MQTT连接断开,正在重连:" + cause);
    }
    /**
     * 发送消息,消息到达后处理方法
     * @param token .
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.error("deliveryComplete---------{}", token.isComplete());
    }
    /**
     * 订阅主题接收到消息处理方法
     * @param topic 主题
     * @param message   消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // 订阅主题后得到的消息会执行到这里面,这里在控制台有输出
        log.error("MQTT接收消息主题 : {}", topic);
        log.error("MQTT接收消息phpQos : {}", message.getQos());
        log.error("MQTT接收消息内容 : {}", message);
    }
}

7、启动后,进入EMQX管理页面

程序允许打印连接成功,去EMQX管理页面查看。

SpringBoot整合MQTT协议实现消息订阅与发布功能

EMQX管理页面这里有所有的主题列表。

SpringBoot整合MQTT协议实现消息订阅与发布功能

包括客户端订阅的主题。

SpringBoot整合MQTT协议实现消息订阅与发布功能

8、通过接口给主题发送消息

@RestController
@Slf4j
@RequestMapping("/api")
public class ApiController {
    @Resource
    private MqttPushClient mqttPushClient;
    @GetMapping("/test")
    public String getVersions(@RequestParam String topic, @RequestParam String message) {
        mqttPushClandroidient.publish(topic, message);
        return "ojavascriptk";
    }
}

浏览器直接调用,topic:配置文件里面订阅的主题。message:你想发送给主题的消息。

SpringBoot整合MQTT协议实现消息订阅与发布功能

控制台日志打印:发送成功,并且接收到了主题发送的消息。

SpringBoot整合MQTT协议实现消息订阅与发布功能

到此这篇关于SpringBoot整合MQTT协议实现消息订阅与发布功能的文章就介绍到这了,更多相关SpringBoot整合MQTT订阅与发布内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜