开发者

如何利用rabbitMq的死信队列实现延时消息

目录
  • 前言
  • mq基本的消息模型
  • mq死信队列的消息模型
    • maven依赖
    • 配置普通队列和死信队列
    • 死信队列消费者
    • 发送消息测试
  • 测试成功
    • 总结

      前言

      使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。

      mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信.

      mq基本的消息模型

      如何利用rabbitMq的死信队列实现延时消息

      mq死信队列的消息模型

      简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。

      然后监听这个死信队列的消费.

      如何利用rabbitMq的死信队列实现延时消息

      一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)

      通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.

      maven依赖

      <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
      

      配置普通队列和死信队列

      yml文件

      spring:
        application:
          name: ttl-queue
        rabbitmq:
          host: 110.40.181.73
          port: 35672
          username: root
          password: 10086
          virtual-host: /fchan
          connection-timeout: 15000
          # 发送确认
          #publisher-confirms: true
          # 路由失败回调
          publisher-returns: true
          template:
            # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
            mandatory: true
          listener:
            simple:
              # 每次从RabbitMQ获取的消息数量
              prefetch: 1
              default-requeue-rejected: false
              # 每个队列启动的消费者数量
              concurrency: 1
              # 每个队列最大的消费者数量
              max-concurrency: 1
              # 签收模式为手动签收-那么需要在代码中手动ACK
              acknowledge-mode: manual
      
      package com.fchanpython.mq.mqDelay.dlx;
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.编程factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class MyRabConfig {
      
          //定义普通任务队列(其实就是一个普通队列,只是没有消费者)
          @Bean("delayQue")
          public Queue delayQue(){
              //普通队列绑定死信交换机及路由key
      
              //delayQueue延时队列
              return QueueBuilder.durable("delayQueue")
                      //指定这个延时队列下的死信交换机
                      //如果消息过时则会被投递到当前对应的my-dlx-exchange
                      .withArgument("x-dead-letter-exchange", "my-dlx-exchange")
                      //死信交换机绑定的路由key
                      //如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列
                      //mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式
                      .withArgument("x-dead-letter-routing-key", "routing-key-delay")
                      .build();
          }
      
          //定义普通队列的交换机
          @Bean("delayExchange")
          public Exchange delayExchange(){
              return ExchangeBuilder.directExchange("delayExchange").build();
          }
      
          //绑定普通队列的交换机
          @Bean("delayBinding")
          public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
              return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
          }
      
      
      
          //定义死信队列
          @Bean("dlxQueue")
          public Queue dlxQueue(){
              return QueueBuilder.durable("my-dlx-queue").build();
          }
      
      
          //定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致
          @Bean("dlxExchange")
          public Exchange dlxExchange(){
              return ExchangeBuilder.directExchange("my-dlx-exchange").build();
          }
      
      
          //绑定交换机与编程客栈死信队列
          @Bean("dlxBinding")
          public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
              //这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致
              return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
          }
      
      }
      

      死信队列消费者

      package com.fchan.mq.mqDelay;
      
      import com.rabbitmq.client.Channel;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      import Java.util.Map;
      
      @Component
      public class MyRabbitConsume {
          http://www.devze.comLogger log = LoggerFactory.getLogger(MyRabbitConsume.class);
      
          @RabbitListener(queues = {"my-dlx-queue"})
          public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
              log.info("收到死信信息:{}",map);
              log.info("然后进行一系列逻辑处理 Thanks♪(・・)ノ");
      		//手动确认消息        
      		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
          }
      
      }
      

      发送消息测试

      /**
       * php发送消息到延时队列 delayQueue
       * 消息5秒后会被投递到死信队列
       * @return
       */
      @GetMapping("sendMsgT开发者_C开发oDelay")
      public String sendMsgToDelay(String msg){
          Map<String,Object> map = new HashMap<>();
          map.put("msg", msg);
          rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  //设置消息过期时间,5秒过期
                  message.getMessageProperties().setExpiration("5000");
                  return message;
              }
          });
          return msg;
      }
      

      测试成功

      如何利用rabbitMq的死信队列实现延时消息

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜