开发者

RabbitMQ实现延迟通知的两种方案

目录
  • 一、延迟通知概述
  • 二、RabbitMQ 实现延迟通知的两种方案
    • 方案对比
  • 三、方案一:基于TTL和死信队列实现
    • 1. 原理
    • 2. 代码实现
      • 2.1 配置类
      • 2.2 生产者 - 发送延迟消息
      • 2.3 消费者 - 接收延迟消息
  • 四、方案二:基于延迟插件实现
    • 1. 安装延迟插件
      • 1.1 docker环境安装
      • 1.2 验证安装
    • 2. 代码实现
      • 2.1 配置类
      • 2.2 生产者 - 发送延迟消息
      • 2.3 消费者 - 接收延迟消息
  • 五、Controller层实现
    • 六、application.yml配置
      • 七、完整的通知场景实现示例
        • 订单超时通知场景
        • 八、两种方案对比与选择建议
          • 1. 性能对比
            • 2. 灵活性对比
              • 3. 选择建议

              一、延迟通知概述

              延迟通知是指消息在发送后不会立即被消费,而是在指定的时间延迟后才被处理的消息传递机制。常见应用场景包括:

              • 订单超时自动取消
              • 定时任务调度
              • 会议/活动前提醒
              • 账单到期通知

              二、RabbitMQ 实现延迟通知的两种方案

              方案对比

              实现方式优点缺点适用场景
              TTL + 死信队列无需安装插件,原生支持1. 队列级TTL不支持动态延迟

              2. 消息级TTL存在性能问题

              延迟时间固定或较少变化的场景
              延迟插件1. 支持每条消息单独设置延迟时间

              2. 性能更好

              3. 配置简单

              需要安装额外插件延迟时间不固定,需要灵活设置的场景

              三、方案一:基于TTL和死信队列实现

              1. 原理

              • 利用消息或队列的TTL(Time-To-Live)特性使消息过期
              • 配置死信交换机(DLX)接收过期消息
              • 将死信消息路由到实际处理队列

              2. 代码实现

              2.1 配置类

              package com.example.delaynotify.config;
              
              import org.springframework.amqp.core.*;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              
              import Java.util.HashMap;
              import java.util.Map;
              
              @Configuration
              public class TtlDelayConfig {
                  // 普通交换机
                  public static final String DELAY_EXCHANGE = "delay_exchange";
                  // 普通队列
                  public static final String DELAY_QUEUE = "delay_queue";
                  // 死信交换机
                  public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
                  // 死信队列
                  public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
                  // 路由键
                  public static final String DELAY_ROUTING_KEY = "delay.key";
                  public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.key";
              
                  // 声明死信交换机
                  @Bean
                  public Exchange deadLetterExchange() {
                      return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
                  }
              
                  // 声明死信队列
                  @Bean
                  public Queue deadLetterQueue() {
                      return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
                  }
              
                  // 声明普通交换机
                  @Bean
                  public Exchange delayExchange() {
                      return ExchangeBuilder.directExchange(DELAY_EXCHANGE).durable(true).build();
                  }
              
                  // 声明延迟队列并绑定死信交换机
                  @Bean
                  public Queue delayQueue() {
                      Map<String, Object> args = new HashMap<>();
                      // 设置死信交换机
                      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
                      // 设置死信路由键
                      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
                      // 队列级TTL (10秒) - 如果需要消息级TTL可以不设置此参数
                      args.put("x-message-ttl", 10000);
                      
                      return QueueBuilder.durable(DELAY_QUEUE)
                              .withArguments(args)
                              .build();
                  }
              
                  // 绑定普通队列和普通交换机
                  @Bean
                  public Binding delayBinding() {
                      return BindingBuilder.bind(delayQueue())
                              .to(delayExchange())
                              .with(DELAY_ROUTING_KEY)
                              .noargs();
                  }
              
                  // 绑定死信队列和死信交换机
                  @Bean
                  public Binding deadLetterBinding() {
                      return BindingBuilder.bind(deadLetterQueue())
                              .to(deajsdLetterExchange())
                              .with(DEAD_LETTER_ROUTING_KEY)
                              .noargs();
                  }
              }
              

              2.2 生产者 - 发送延迟消息

              package com.example.delaynotify.service;
              
              import com.example.delaynotify.config.TtlDelayConfig;
              import org.springframework.amqp.AmqpException;
              import org.springframework.amqp.core.Message;
              import org.springframework.amqp.core.MessagePostProcessor;
              import org.springframework.amqp.rabbit.core.RabbitTemplate;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              
              @Service
              public class TtlDelayMessageService {
              
                  @Autowired
                  private RabbitTemplate rabbitTemplate;
              
                  // 发送固定延迟时间的消息(队列级TTL)
                  public void sendFixedDelayMessage(String message) {
                      System.out.println("发送固定延迟消息: " + message + ", 时间: " + System.currentTimeMillis());
                      rabbitTemplate.convertAndSend(
                              TtlDelayConfig.DELAY_EXCHANGE,
                              TtlDelayConfig.DELAY_ROUTING_KEY,
                              message
                      );
                  }
              
                  // 发送自定义延迟时间的消息(消息级TTL)
                  public void sendCustomDelayMessage(String message, long delayMillis) {
                      System.out.println("发送自定义延迟消息: " + message + ", 延迟时间: " + delayMillis + "ms, 时间: " + System.currentTimeMillis());
                      
                      rabbitTemplate.convertAndSend(
                              TtlDelayConfig.DELAY_EXCHANGE,
                              TtlDelayConfig.DELAY_ROUTING_KEY,
                              message,
                              new MessagePostProcessor() {
                                  @Override
                                  public Message postProcessMessage(Message message) throws AmqpException {
                                      // 设置消息级TTL
                                      message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
                                      return message;
                                  }
                              }
                      );
                  }
              }
              

              2.3 消费者 - 接收延迟消息

              package com.example.delaynotify.consumer;
              
              import com.example.delaynotify.config.TtlDelayConfig;
              import com.rabbitmq.client.Channel;
              import org.springframework.amqp.core.Message;
              import org.springframework.amqp.rabbit.annotation.RabbitListener;
              import org.springframework.stereotype.Component;
              
              import java.io.IOException;
              
              @Component
              public class TtlDelayMessageConsumer {
              
                  @RabbitListener(queues = TtlDelayConfig.DEAD_LETTER_QUEUE)
                  public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException {
                      try {
                          System.out.println("接收到延迟消息: " + message + ", 时间: " + System.currentTimeMillis());
                          
                          // 处理业务逻辑 - 例如发送通知、更新状态等
                          processDelayMessage(message);
                          
                          // 手动确认消息
                          channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                      } catch (Exception e) {
                          System.out.println("消息处理失败: " + e.getMessage());
                          // 拒绝消息并丢弃
                          channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
                      }
                  }
                  
                  private void processDelayMessage(String message) {
                      // 模拟发送通知的业务逻辑
                      System.out.println("执行通知业务: " + message);
                      // 这里可以调用邮件、短信、推送等服务
                  }
              }
              

              四、方案二:基于延迟插件实现

              1. 安装延迟插件

              1.1 Docker环境安装

              # 下载插件(根据RabbitMQ版本选择对应版本)
              wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
              
              # 复制插件到容器
              docker cp rabbitmq_delayed_message_exchange-3.11.1.ez rabbitmq:/plugins
              
              # 进入容器启用插件
              docker exec -it rabbitmq bash
              rabbitmq-plugins enable rabbitmq_delayed_message_exchange
              exit
              
              # 重启RabbitMQ容器
              docker restart rabbitmq
              

              1.2 验证安装

              在RabbitMQ管理界面新建交换机时,如果能看到x-delayed-message类型,则表示插件安装成功。

              2. 代码实现

              2.1 配置类

              package com.example.delaynotify.config;
              
              import org.springframework.amqp.core.*;
              import org.springframework.context.annotation.Bean;
              import org.springframework.context.annotation.Configuration;
              
              import java.util.HashMap;
              import java.util.Map;
              
              @Configuration
              public class PluginDelayConfig {
                  // 延迟交换机
                  public static final String DELAY_PLUGIN_EXCHANGE = "delay_plugin_exchange";
                  // 延迟队列
                  public static final String DELAY_PLUGIN_QUEUE = "delay_plugin_queue";
                  // 路由键
                  public static final String DELAY_PLUGIN_ROUTING_KEY = "delay.plugin.key";
              
                  // 声明延迟交换机(类型为x-delayed-message)
                  @Bean
                  public CustomExchange delayPluginExchange() {
                      Map<String, Object> args = new HashMap<>();
                      // 设置底层路由模式为direct
                      args.put("x-delayed-type", "direct");
                      return new CustomExchange(
                              DELAY_PLUGIN_EXCHANGE,
                              "x-delayed-message",
                              true,  // 持久化
                              false, // 非自动删除
                              args
                      );
                  }
              
                  // 声明延迟队列
                  @Bean
                  public Queue delayPluginQueue() {
                      return QueueBuilderpython.durable(DELAY_PLUGIN_QUEUE).build();
                  }
              
                  // 绑定延迟交换机和延迟队列
                  @Bean
                  public Binding delayPluginBinding() {
                      return BindingBuilder.bind(delayPluginQueue())
                              .to(delayPluginExchange())
                              .with(DELAY_PLUGIN_ROUTING_KEY)
                              .noargs();
                  }
              }
              

              2.2 生产者 - 发送延迟消息

              package com.example.delaynotify.service;
              
              import com.example.delaynotify.config.PluginDelayConfig;
              import org.springframework.amqp.AmqpException;
              import org.springframework.amqp.core.Message;
              import org.springframework.amqp.core.MessagePostProcessor;
              import org.springframework.amqp.rabbit.core.RabbitTemplate;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              
              @Service
              public class PluginDelayMessageService {
              
                  @Autowired
                  private RabbitTemplate rabbitTemplate;
              
                  // 发送延迟消息
                  public void sendDelayMessage(String message, long delayMillis) {
                      System.out.println("使用插件发送延迟消息: " + message + ", 延迟时间: " + delayMillis + "ms, 时间: " + System.currentTimeMillis());
                      
                      rabbitTemplate.convertAndSend(
                              PluginDelayConfig.DELAY_PLUGIN_EXCHANGE,
                              PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY,
                              message,
                              new MessagePostProcessor() {
                                  @Override
                                  public Message postProcessMessage(Message message) throws AmqpException {
                                      // 设置延迟时间(毫秒)
                                      message.getMessageProperties().setDelay((int) delayMillis);
                                      return message;
                                  }
                              }
                      );
                  }
              }
              

              2.3 消费者 - 接收延迟消息

              package com.example.delaynotify.consumer;
              
              import com.example.delaynotify.config.PluginDelayConfig;
              import com.rabbitmq.client.Channel;
              import org.springframework.amqp.core.Message;
              import org.springframework.amqp.rabbit.annotation.RabbitListener;
              import org.springframework.stereotype.Component;
              
              import java.io.IOException;
              
              @Component
              public class PluginDelayMessageConsumer {
              
                  @RabbitListener(queues = PluginDelayConfig.DELAY_PLUGIN_QUEUE)
                  public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException {
                      try {
                          System.out.println("接收到插件延迟消息: " + message + ", 时间: " + System.currentTimeMillis());
                          
                          // 处理业务逻辑 - 例如发送通知、更新状态等
                          processDelayMessage(message);
                          
                          // 手动确认消息
                          channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                      } catch (Exception e) {
                          System.out.println("插件延迟消息处理失败: " + e.getMessage());
                          // 拒绝消息并丢弃
                          channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
                      }
                  }
                  
                  private void processDelayMessage(String message) {
                      // 模拟发送通知的业务逻辑
                      System.out.println("执行通知业务: " + message);
                      // 这里可以调用邮件、短信、推送等服务
                  }
              }
              

              五、Controller层实现

              package com.example.delaynotify.controller;
              
              import com.example.delaynotify.service.PluginDelayMessageService;
              import com.example.delaynotify.service.TtlDelayMessageService;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.web.bind.annotation.GetMapping;
              import org.springframework.web.bind.annotation.RequestParam;
              import org.springframework.web.bind.annotation.RestController;
              
              @RestController
              public class DelayNotifyController {
              
                  @Autowired
                  private TtlDelayMessageService ttlDelayMessageService;
                  
                  @Autowired
                  private PluginDelayMessageService pluginDelayMessageService;
              
                  // 基于TTL的固定延迟
                  @GetMapping("/ttl/fixed")
                  public String sendFixedTtlDelay(@RequestParam String message) {
                      ttlDelayMessageService.sendFixedDelayMessage(message);
                      rejavascriptturn "固定延迟消息已发送 (10秒)";
              GfgHmNdz    }
              
                  // 基于TTL的自定义延迟
                  @GetMapping("/ttl/custom")
                  public String sendCustomTtlDelay(@RequestParam String message, @RequestParam long delayMillis) {
                      ttlDelayMessageService.sendCustomDelayMessage(message, delayMillis);
                      return "自定义延迟消息已发送 (" + delayMillis + "ms)";
                  }
              
                  // 基于插件的延迟
                  @GetMapping("/plugin/delay")
                  public String sendPluginDelay(@RequestParam String message, @RequestParam long delayMillis) {
                      pluginDelayMessageService.sendDelayMessage(message, delayMillis);
                      return "插件延迟消息已发送 (" + delayMillis + "ms)";
                  }
              }
              

              六、application.yml配置

              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: admin
                  password: admin
                  virtual-host: /
                  # 生产者确认配置
                  publisher-confirm-type: correlated
                  publisher-returns: true
                  template:
                    mandatory: true
                  # 消费者配置
                  listener:
                    simple:
                      acknowledge-mode: manual
                      prefetch: 1
                      concurrency: 1
                      max-concurrency: 5
              

              七、完整的通知场景实现示例

              订单超时通知场景

              package com.example.delaynotify.service;
              
              import com.example.delaynotify.config.PluginDelayConfig;
              import org.springframework.amqp.core.MessagePostProcessor;
              import org.springframework.amqp.rabbit.core.RabbitTemplate;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              
              import java.time.LocalDateTime;
              import java.time.format.DateTimeFormatter;
              
              @Service
              public class OrderNotifyService {
              
                  @Autowired
                  private RabbitTemplate rabbitTemplate;
                  
                  private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
              
                  /**
                   * 创建订单并设置超时通知
                   * @param orderId 订单ID
                   * @param notifyDelaySeconds 超时时间(秒)
                   */
                  public void createOrderAndSetTimeout(String orderId, int notifyDelaySeconds) {
                      // 1. 保存订单逻辑
                      System.out.println("创建订单: " + orderId + " 时间: " + LocalDateTime.now().format(formatter));
                      
                      // 2. 设置延迟通知
                      String notifyMessage = "订单[" + orderId + "]已超时,需要取消处理";
                   编程客栈   long delayMillis = notifyDelaySeconds * 1000L;
                      
                      System.out.println("设置订单超时通知,延迟: " + notifyDelaySeconds + "秒");
                      
                      // 使用延迟插件发送通知消息
                      rabbitTemplate.convertAndSend(
                              PluginDelayConfig.DELAY_PLUGIN_EXCHANGE,
                              PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY,
                              notifyMessage,
                              message -> {
                                  message.getMessageProperties().setDelay((int) delayMillis);
                                  return message;
                              }
                      );
                  }
              }
              

              八、两种方案对比与选择建议

              1. 性能对比

              • TTL+死信队列:当使用消息级TTL时,RabbitMQ需要为每条消息设置过期时间,会造成额外的性能开销
              • 延迟插件:插件内部使用优先队列实现,性能更优,特别适合大量不同延迟时间的消息场景

              2. 灵活性对比

              • TTL+死信队列:如果要支持不同的延迟时间,需要创建多个不同TTL的队列
              • 延迟插件:每条消息都可以设置不同的延迟时间,更加灵活

              3. 选择建议

              • 如果延迟时间固定或种类较少,可以使用TTL+死信队列方案,无需安装插件
              • 如果延迟时间不固定或种类较多,强烈建议使用延迟插件方案
              • 对于生产环境,建议使用延迟插件方案,性能更好、配置更简洁

              通过以上两种方案,您可以根据实际需求选择合适的方式实现RabbitMQ的延迟通知功能,满足订单超时、定时提醒等各种业务场景。

              以上就是RabbitMQ实现延迟通知的两种方案的详细内容,更多关于RabbitMQ延迟通知的资料请关注编程客栈(www.devze.com)其它相关文章!

              0

              上一篇:

              下一篇:

              精彩评论

              暂无评论...
              验证码 换一张
              取 消

              最新开发

              开发排行榜