RabbitMQ实现延迟通知的两种方案
目录
- 一、延迟通知概述
- 二、RabbitMQ 实现延迟通知的两种方案
- 方案对比
- 三、方案一:基于TTL和死信队列实现
- 1. 原理
- 2. 代码实现
- 2.1 配置类
- 2.2 生产者 - 发送延迟消息
- 2.3 消费者 - 接收延迟消息
- 四、方案二:基于延迟插件实现
- 1. 安装延迟插件
- 1.1 docker环境安装
- 1.2 验证安装
- 2. 代码实现
- 2.1 配置类
- 2.2 生产者 - 发送延迟消息
- 2.3 消费者 - 接收延迟消息
- 五、Controller层实现
- 六、application.yml配置
- 七、完整的通知场景实现示例
- 订单超时通知场景
- 八、两种方案对比与选择建议
- 1. 性能对比
- 2. 灵活性对比
- 3. 选择建议
一、延迟通知概述
延迟通知是指消息在发送后不会立即被消费,而是在指定的时间延迟后才被处理的消息传递机制。常见应用场景包括:
- 订单超时自动取消
- 定时任务调度
- 会议/活动前提醒
- 账单到期通知
二、RabbitMQ 实现延迟通知的两种方案
方案对比
实现方式 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
TTL + 死信队列 | 无需安装插件,原生支持 | 1. 队列级TTL不支持动态延迟2. 消息级TTL存在性能问题 | 延迟时间固定或较少变化的场景 |
延迟插件 | 1. 支持每条消息单独设置延迟时间2. 性能更好3. 配置简单 | 需要安装额外插件 | 延迟时间不固定,需要灵活设置的场景 |
三、方案一:基于TTL和死信队列实现
1. 原理
- 利用消息或队列的TTL(Time-To-Live)特性使消息过期
- 配置死信交换机(DLX)接收过期消息
- 将死信消息路由到实际处理队列
2. 代码实现
2.1 配置类
package com.example.delaynotify.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import Java.util.HashMap; import java.util.Map; @Configuration public class TtlDelayConfig { // 普通交换机 public static final String DELAY_EXCHANGE = "delay_exchange"; // 普通队列 public static final String DELAY_QUEUE = "delay_queue"; // 死信交换机 public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange"; // 死信队列 public static final String DEAD_LETTER_QUEUE = "dead_letter_queue"; // 路由键 public static final String DELAY_ROUTING_KEY = "delay.key"; public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.key"; // 声明死信交换机 @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build(); } // 声明死信队列 @Bean public Queue deadLetterQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } // 声明普通交换机 @Bean public Exchange delayExchange() { return ExchangeBuilder.directExchange(DELAY_EXCHANGE).durable(true).build(); } // 声明延迟队列并绑定死信交换机 @Bean public Queue delayQueue() { Map<String, Object> args = new HashMap<>(); // 设置死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 队列级TTL (10秒) - 如果需要消息级TTL可以不设置此参数 args.put("x-message-ttl", 10000); return QueueBuilder.durable(DELAY_QUEUE) .withArguments(args) .build(); } // 绑定普通队列和普通交换机 @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with(DELAY_ROUTING_KEY) .noargs(); } // 绑定死信队列和死信交换机 @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deajsdLetterExchange()) .with(DEAD_LETTER_ROUTING_KEY) .noargs(); } }
2.2 生产者 - 发送延迟消息
package com.example.delaynotify.service; import com.example.delaynotify.config.TtlDelayConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class TtlDelayMessageService { @Autowired private RabbitTemplate rabbitTemplate; // 发送固定延迟时间的消息(队列级TTL) public void sendFixedDelayMessage(String message) { System.out.println("发送固定延迟消息: " + message + ", 时间: " + System.currentTimeMillis()); rabbitTemplate.convertAndSend( TtlDelayConfig.DELAY_EXCHANGE, TtlDelayConfig.DELAY_ROUTING_KEY, message ); } // 发送自定义延迟时间的消息(消息级TTL) public void sendCustomDelayMessage(String message, long delayMillis) { System.out.println("发送自定义延迟消息: " + message + ", 延迟时间: " + delayMillis + "ms, 时间: " + System.currentTimeMillis()); rabbitTemplate.convertAndSend( TtlDelayConfig.DELAY_EXCHANGE, TtlDelayConfig.DELAY_ROUTING_KEY, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置消息级TTL message.getMessageProperties().setExpiration(String.valueOf(delayMillis)); return message; } } ); } }
2.3 消费者 - 接收延迟消息
package com.example.delaynotify.consumer; import com.example.delaynotify.config.TtlDelayConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class TtlDelayMessageConsumer { @RabbitListener(queues = TtlDelayConfig.DEAD_LETTER_QUEUE) public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException { try { System.out.println("接收到延迟消息: " + message + ", 时间: " + System.currentTimeMillis()); // 处理业务逻辑 - 例如发送通知、更新状态等 processDelayMessage(message); // 手动确认消息 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { System.out.println("消息处理失败: " + e.getMessage()); // 拒绝消息并丢弃 channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false); } } private void processDelayMessage(String message) { // 模拟发送通知的业务逻辑 System.out.println("执行通知业务: " + message); // 这里可以调用邮件、短信、推送等服务 } }
四、方案二:基于延迟插件实现
1. 安装延迟插件
1.1 Docker环境安装
# 下载插件(根据RabbitMQ版本选择对应版本) wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez # 复制插件到容器 docker cp rabbitmq_delayed_message_exchange-3.11.1.ez rabbitmq:/plugins # 进入容器启用插件 docker exec -it rabbitmq bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange exit # 重启RabbitMQ容器 docker restart rabbitmq
1.2 验证安装
在RabbitMQ管理界面新建交换机时,如果能看到x-delayed-message
类型,则表示插件安装成功。
2. 代码实现
2.1 配置类
package com.example.delaynotify.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class PluginDelayConfig { // 延迟交换机 public static final String DELAY_PLUGIN_EXCHANGE = "delay_plugin_exchange"; // 延迟队列 public static final String DELAY_PLUGIN_QUEUE = "delay_plugin_queue"; // 路由键 public static final String DELAY_PLUGIN_ROUTING_KEY = "delay.plugin.key"; // 声明延迟交换机(类型为x-delayed-message) @Bean public CustomExchange delayPluginExchange() { Map<String, Object> args = new HashMap<>(); // 设置底层路由模式为direct args.put("x-delayed-type", "direct"); return new CustomExchange( DELAY_PLUGIN_EXCHANGE, "x-delayed-message", true, // 持久化 false, // 非自动删除 args ); } // 声明延迟队列 @Bean public Queue delayPluginQueue() { return QueueBuilderpython.durable(DELAY_PLUGIN_QUEUE).build(); } // 绑定延迟交换机和延迟队列 @Bean public Binding delayPluginBinding() { return BindingBuilder.bind(delayPluginQueue()) .to(delayPluginExchange()) .with(DELAY_PLUGIN_ROUTING_KEY) .noargs(); } }
2.2 生产者 - 发送延迟消息
package com.example.delaynotify.service; import com.example.delaynotify.config.PluginDelayConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class PluginDelayMessageService { @Autowired private RabbitTemplate rabbitTemplate; // 发送延迟消息 public void sendDelayMessage(String message, long delayMillis) { System.out.println("使用插件发送延迟消息: " + message + ", 延迟时间: " + delayMillis + "ms, 时间: " + System.currentTimeMillis()); rabbitTemplate.convertAndSend( PluginDelayConfig.DELAY_PLUGIN_EXCHANGE, PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置延迟时间(毫秒) message.getMessageProperties().setDelay((int) delayMillis); return message; } } ); } }
2.3 消费者 - 接收延迟消息
package com.example.delaynotify.consumer; import com.example.delaynotify.config.PluginDelayConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class PluginDelayMessageConsumer { @RabbitListener(queues = PluginDelayConfig.DELAY_PLUGIN_QUEUE) public void receiveDelayMessage(String message, Channel channel, Message msg) throws IOException { try { System.out.println("接收到插件延迟消息: " + message + ", 时间: " + System.currentTimeMillis()); // 处理业务逻辑 - 例如发送通知、更新状态等 processDelayMessage(message); // 手动确认消息 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { System.out.println("插件延迟消息处理失败: " + e.getMessage()); // 拒绝消息并丢弃 channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false); } } private void processDelayMessage(String message) { // 模拟发送通知的业务逻辑 System.out.println("执行通知业务: " + message); // 这里可以调用邮件、短信、推送等服务 } }
五、Controller层实现
package com.example.delaynotify.controller; import com.example.delaynotify.service.PluginDelayMessageService; import com.example.delaynotify.service.TtlDelayMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class DelayNotifyController { @Autowired private TtlDelayMessageService ttlDelayMessageService; @Autowired private PluginDelayMessageService pluginDelayMessageService; // 基于TTL的固定延迟 @GetMapping("/ttl/fixed") public String sendFixedTtlDelay(@RequestParam String message) { ttlDelayMessageService.sendFixedDelayMessage(message); rejavascriptturn "固定延迟消息已发送 (10秒)"; GfgHmNdz } // 基于TTL的自定义延迟 @GetMapping("/ttl/custom") public String sendCustomTtlDelay(@RequestParam String message, @RequestParam long delayMillis) { ttlDelayMessageService.sendCustomDelayMessage(message, delayMillis); return "自定义延迟消息已发送 (" + delayMillis + "ms)"; } // 基于插件的延迟 @GetMapping("/plugin/delay") public String sendPluginDelay(@RequestParam String message, @RequestParam long delayMillis) { pluginDelayMessageService.sendDelayMessage(message, delayMillis); return "插件延迟消息已发送 (" + delayMillis + "ms)"; } }
六、application.yml配置
spring: rabbitmq: host: localhost port: 5672 username: admin password: admin virtual-host: / # 生产者确认配置 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true # 消费者配置 listener: simple: acknowledge-mode: manual prefetch: 1 concurrency: 1 max-concurrency: 5
七、完整的通知场景实现示例
订单超时通知场景
package com.example.delaynotify.service; import com.example.delaynotify.config.PluginDelayConfig; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @Service public class OrderNotifyService { @Autowired private RabbitTemplate rabbitTemplate; private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); /** * 创建订单并设置超时通知 * @param orderId 订单ID * @param notifyDelaySeconds 超时时间(秒) */ public void createOrderAndSetTimeout(String orderId, int notifyDelaySeconds) { // 1. 保存订单逻辑 System.out.println("创建订单: " + orderId + " 时间: " + LocalDateTime.now().format(formatter)); // 2. 设置延迟通知 String notifyMessage = "订单[" + orderId + "]已超时,需要取消处理"; 编程客栈 long delayMillis = notifyDelaySeconds * 1000L; System.out.println("设置订单超时通知,延迟: " + notifyDelaySeconds + "秒"); // 使用延迟插件发送通知消息 rabbitTemplate.convertAndSend( PluginDelayConfig.DELAY_PLUGIN_EXCHANGE, PluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY, notifyMessage, message -> { message.getMessageProperties().setDelay((int) delayMillis); return message; } ); } }
八、两种方案对比与选择建议
1. 性能对比
- TTL+死信队列:当使用消息级TTL时,RabbitMQ需要为每条消息设置过期时间,会造成额外的性能开销
- 延迟插件:插件内部使用优先队列实现,性能更优,特别适合大量不同延迟时间的消息场景
2. 灵活性对比
- TTL+死信队列:如果要支持不同的延迟时间,需要创建多个不同TTL的队列
- 延迟插件:每条消息都可以设置不同的延迟时间,更加灵活
3. 选择建议
- 如果延迟时间固定或种类较少,可以使用TTL+死信队列方案,无需安装插件
- 如果延迟时间不固定或种类较多,强烈建议使用延迟插件方案
- 对于生产环境,建议使用延迟插件方案,性能更好、配置更简洁
通过以上两种方案,您可以根据实际需求选择合适的方式实现RabbitMQ的延迟通知功能,满足订单超时、定时提醒等各种业务场景。
以上就是RabbitMQ实现延迟通知的两种方案的详细内容,更多关于RabbitMQ延迟通知的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论