开发者

RabbitMQ消费端单线程与多线程案例讲解

目录
  • 一、基础概念
    • 详细解释:
    • 举个例子:
  • ✅ 单消费者 + 单线程消费
    • ❌ 单消费者 + 多线程消费
      • ❌ 多消费者 + 单线程消费
        • ❌ 多消费者 + 多线程消费
          • 补充说明

            一、基础概念

            模型消费者数量每个消费者内部线程数顺序性场景说明
            单消费者单线程11✅ 保序处理逻辑简单,保证顺序的常见场景
            单消费者多线程1>1❌ 不保序提升处理能力,放弃顺序要求
            多消费者单线程>11❌ 不保序多个队列/分区消费,提升并发
            多消费者多线程>1>1❌ 不保序高并发场景下批量处理,放弃顺序
            concurrenphpcy# 初始消费者线程数
            max-concurrency# 最大消费者线程数
            prefetch# 每个消费者预取的消息数
            • concurrency: 2
              • 表示初始创建的消费者线程数量
              • 系统启动时会立即创建 2 个消费者线程
              • 这些线程会持续监听消息队列
            • max-concurrency: 2
              • 表示允许的最大消费者线程数量
              • 这里设置为 2(与 concurrency 相同),表示线程数不会动态扩展
              • 如果设置 max-concurrency > concurrency,系统会在负载高时动态增加消费者

            详细解释:

                   concurrency和max-concurrency不会影响每个消费者是否是多线程执行,只会导致有多个消费者线程,只有用线程池才会导致每个消费者多线程消费

                    而没有用线程池,也设置prefetch是因为消息被大量预取,单线程处理不过来时堆积等待,单线程并不会影响消息的顺序性,只有使用了线程池才会影响

                    使用了线程池一定会导致消息顺序性问题这与设不设置prefetch无关,因为使用线程池后,任务交个线程池就返回了属于异步

            举个例子:

                            1. RabbitMQ 给消费者推送消息1,消费者收到,提交给线程池任务A(耗时长)。

                            2. 消费者马上ACK消息1(因为业务交给线程池了,自己处理完毕的感觉

                            3.  RabbitMQ 再给消费者推送消息2,消费者收到,提交给线程池任务B(耗时短)。

                            4. R线程池调度先跑完任务B,后跑任务A。

            ✅ 单消费者 + 单线程消费

            • 保证顺序:消费者内部串行执行。
            • 配置关键
            spring:
              rabbitmq:
                listener:
                  simple:
                    concurrency: 1
                    max-concurrency: 1
                    prefetch: 1

            消费者代码

            @Component
            public class MultiConsumerSingleThread {
                @RabbitListener(queues = "order_queue", concurrency = "2")
                public void receive(String message) {
                    System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {javascript
                        e.printStackTrace();
                    }
                }
            }

            ❌ 单消费者 + 多线程消费

            • 不保顺序:一个消费者使用线程池异步处理消息。
            • 配置关键:默认配置 + 手动异js步处理
            • 消费者代码
            @Component
            public class MultiThreadConsumer {
                private final ExecutorService executor = Executors.newFixedThreadPool(5);
                @RabbitListener(queues = "order_queue")
                public void receive(String message) {
                    executor.subhttp://www.devze.commit(() -> {
                        System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
                        try {
                            Thread.sleep(500); // 模拟耗时
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }

            说明:消息提交到线程池,先到的不一定先处理完成,顺序可能乱。

            ❌ 多消费者 + 单线程消费

            • 不保顺序:多个消费者实例轮询分配消息,各自顺序保留,但整体顺序错乱。
            • 配置关键
            spring:
              rabbitmq:
                listener:
                  simple:
                    concurrency: 2
                    max-concurrency: 2
                    prefetch: 1

            消费者代码(共享类,也可拆成多个类模拟多实例)

            @Component
            public class MultiConsumerSingleThread {
                //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但优先级更高。
                @RabbitListener(queues = "order_queue", concurrency = "2")
                public void receive(String message) {
                    System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

            ❌ 多消费者 + 多线程消费

            • 不保顺序:每个消费者又使用线程池异步处理消息,最大吞吐量模式。
            • 适合场景:数据导入、日志收集、发送通知等对顺序无要求的批量处理。
            • 配置关键
            spring:
              rabbitmq:
                listener:
                  simple:
                    concurrency: 3
                    max-concurrency: 3
                    prefetch: 10

            消费者代码

            @Component
            public class MultiConsumerMultiThread {
                private final ExecutorService executor = Executors.newFixedThreadPool(10);
                @RabbitListener(queues = "order_queue", concurrency = "3")
                public void receive(String message) {
                    executor.submit(() -> {
                        System.out.println(www.devze.com" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }

            补充说明

            • concurrency: 控制并发消费者数量,等于消费者数。
            • prefetch: 控制每个消费者本地最多拉取多少条消息(如 1 表示严格串行处理)。
            • 每个 @RabbitListener 本质上是一个容器,可以通过 concurrency 配置“实例个数”。

            到此这篇关于RabbitMQ消费端单线程与多线程的文章就介绍到这了,更多相关RabbitMQ单线程与多线程内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

            0

            上一篇:

            下一篇:

            精彩评论

            暂无评论...
            验证码 换一张
            取 消

            最新开发

            开发排行榜