RabbitMQ工作模式中的发布确认模式示例详解
目录
- 发布确认模式
- 概述
- 消息丢失问题
- 发布确认的三种模式
- 实现步骤
- 应用场景
- 代码案例
- 引入依赖
- 常量类
- 单条确认
- 运行代码
- 批量确认
- 异步确认
- 对比批量确认和异步确认模式
发布确认模式
概述
发布确认模式用于确保消息已经被正确地发送到RabbitMQ服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。
消息丢失问题
作为消息中间件, 都会⾯临消息丢失的问题.
消息丢失⼤概分为三种情况:1. ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
2. 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 编程客栈导致消息丢失.3. 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除了。
RabbitMQ也对上述问题给出了相应的解决⽅案. 问题2可以通过持久化机制. 问题3可以采⽤消息应答机制.
针对问题1, 可以采⽤发布确认(Publisher Confirms)机制实现.发布确认的三种模式
RabbitMQ的发布确认模式主要有三种形式:单条确认、批量确认和异步确认。
单条确认(Single Publisher Confirm)
特点:http://www.devze.com在发布一条消息后,等待服务器确认该消息是否成功接收。
优点:实现简单,每条消息的确认状态清晰。缺点:性能开销较大,特别是在高并发的场景下,因为每条消息都需要等待服务器的确认。
批量确认(BATch Publisher Confirm)
特点:允许在一次性确认多个消息是否成功被服务器接收。
优点:在大量消息的场景中可以提高效率,因为可以减少确认消息的数量。缺点:当一批消息中有一条消息发送失败时,整个批量确认失败。此时需要重新发送整批消息,但不知道是哪条消息发送失败,增加了调试和处理的难度。
异步确认(Asynchronous Confirm)
特点:通过回调函数处理消息的确认和未确认事件,更加灵活。
优点:在异步场景中能够更好地处理消息的状态,提高了系统的并发性能和响应速度。缺点:实现相对复杂,需要处理回调函数的逻辑和状态管理。
实现步骤
1.设置通道为发布确认模式:在生产者发送消息之前,需要将通道设置为发布确认模式。这可以通过调用channel.confirmSelect()方法来实现。
2.发送消息并等待确认:生产者发送消息时,每条消息都会分配一个唯一的、递增的整数ID(DeliveryTag)。生产者可以通过调用channel.waitForConfirms()方法来等待所有已发送消息的确认,或者通过其他方式处理确认回调。3.处理确认回调:为了处理确认回调,需要创建一个ConfirmCallback接口的实现。在实现的handleAck()方法中,可以处理成功接收到确认的消息的逻辑;在handleNack()方法中,可以处理未成功接收到确认的消息的逻辑。
应用场景
发布确认模式适用于对数据安全性要求较高的场景,如金融交易、订单处理等。在这些场景中,消息的丢失或重复都可能导致严重的业务问题。通过使用发布确认模式,可以确保消息被正确地发送到RabbitMQ服务器,并被成功接收和持久化,从而提高了系统的可靠性和稳定性。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量类
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //publisher confirms public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1"; public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2"; public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3"; }
单条确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import Java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 100; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #1: Publishing Messages Individually //单独确认 publishingMessagesIndividually(); } /** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null); //4. 发送消息, 并等待确认 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); //等待确认 channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码
我们可以看到,以发送消息条数为100条为例,单条确认模式是非常耗时的。
批量确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { php //Strategy #2:javascript Publishing Messages in Batches //批量确认 publishingMessagesInBatches(); } /** * 批量确认 * @throws Exception */ private static void publishingMessagesInBatches() throws Exception{ try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); //4. 发送消息, 并进行确认 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; if (outstandingMessageCount==batchSize){ channel.waitForConfirmsOrDie(5000); outstandingMessageCount = 0; } } if (outstandingMessageCount>0){ channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码
我们可以看到,以发送消息条数为10000条为例,单条确认模式是比较快的。
异步确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #3: Handling Publisher Confirms Asynchronously //异步确认 handlingPublisherConfirmsAsynchronously(); } /** * 异步确认 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception{ try (Connection connection = createConnection()){ //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); //4. 监听confirm //集合中存储的是未确认的消息ID long start = System.currentTimeMillis(); SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSetandroid<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } //业务需要根据实际场景进行处理, 比如重发, 此处代码省略 } }); //5. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); } while (!confirmSeqNo.isEmpty()){ Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码
我们可以看到,以发送消息条数为10000条为例,单条确认模式是非常快的。
对比批量确认和异步确认模式
我们可以看到,异步确认模式是比批量确认模式快很多的。
到此这篇关于RabbitMQ工作模式之发布确认模式的文章就介绍到这了,更多相关RabbitMQ发布确认模式内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论