Rabbit消息重试机制问题记录
目录
- 消息重试机制
- 概述
- 实现方式一:基于消息手动确认机制,返回 nack 实现
- 配置文件
- 交换机、队列、绑定
- 生产者接口
- 消费者
- 演示和结论
- 实现方式二:基于重试配置实现 配置文件
- 交换机、队列、绑定
- 生产者接口
- 消费者
- 演示和结论
消息重试机制
概述
消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等.
Ps:如果时程序逻辑引起的错误,那么即使重试多少次都是没有用的,但是可以通过配置重试次数来解决.
实现方式一:基于消息手动确认机制,返回 nack 实现
配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: manual # 手动确认
交换机、队列、绑定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") que编程客栈ue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生产者接口
@RestController @RequestMapping("/mq3") class MQ3Api( val rabbitTemplate: RabbitTemplate ) { @RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" } }
消费者
import com.cyk.rabbitmq.constants.MQConst import com.rabbipythontmq.client.Channel //注意这里的依赖 import org.springframework.amqp.core.Message //注意这里的依赖 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import Java.ni编程客栈o.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: 编程客栈Channel, ) { val deliveryTag = message.messageProperties.deliveryTag try { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag") val a = 1 / 0 channel.basicAck(deliveryTag, false) } catch (e: Exception) { //通过返回 nack,并设置 requeue 为 ture 实现消息重新入队,并进行重试 channel.basicNack(deliveryTag, false, true) } } }
演示和结论
deliverTag 自增的原因: 引发异常后,会返回 nack,并且参数 requeue = true,表示重新入队,然后进行重试,将队列中的消息再次发送给生产者,因此 deliverTag 会自增.
缺点: 如果是由于程序逻辑异常引起的重试
,那么无论重试多少次都没用,并且不断重试会导致负载飙升,性能下降
.
实现方式二:基于重试配置实现 配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: php simple: acknowledge-mode: auto # 开启重试机制,这里必须是 auto,否则不生效! retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 失败等待时常 max-attempts: 5 # 最大重试次数(包括第一次消费)
Ps:开启重试机制,acknowledge-mode 必须指定为 auto,否则不生效!
交换机、队列、绑定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生产者接口
@RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" }
消费者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意这里的依赖 import org.springframework.amqp.core.Message //注意这里的依赖 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}") val a = 1 / 0 } }
演示和结论
deliverTag 不自增的原因: 因为是消息已经发出去了,即使失败了也不会重回队列,而是直接重新发一遍消息.
好处: 不仅可以控制重试次数(防止类似于上面讲到的确认应答引起的无限重试),还可以控制每次重试的间隔时间(防止负载飙升).
到此这篇关于Rabbit高级特性 - 消息重试机制的文章就介绍到这了,更多相关Rabbit消息重试机制内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论