开发者

SpringCloudStream中的消息分区数详解

目录
  • 一、前言
  • 二、影响因素
    • 2.1 Kafka服务端
    • 2.2 生产者端
    • 2.3 消费者端
    • 2.4 其他因素
  • 总结

    一、前言

    本文仅针对 Kafka 来聊消www.devze.com息分区数相关的话题。

    SpringCloudStream 中的消息分区数如何配置?

    或者说消息分区数会受到哪些配置的影响。

    • SpringCloudStream:Greenwich.SR2
    • Kafka:kafka_2.12-2.3.0

    二、影响因素

    2.1 Kafka服务端

    首先应该想到的,Kafka 配置文件 server.properties 中默认每一个 topic 的分区数 num.partitions=1

    # The default number of log partitions per topic. More partitions allow greater
    num.partitions=1

    2.2 生产者端

    从SpringCloudStream的配置中可以看到,生产者可以指定分区数,默认1:

    spring.cloud.stream.bindings.<channelName>.partitionCohttp://www.devze.comunt.producer=n

    【说明】:当分区功能开启时,使用该参数来配置消息数据的分区数。

    如果消息生产者已经配置了分区键的生成策略,那么它的值必须大于1。

    2.3 消费者端

    SpringCloudStream 允许通过配置,使得消费android者能够自动创建分区。

    #输入通道消费者的并发数,默认1
    spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2

    若想以上配置生效,还需添加如下通用配置:

    #Kafka绑定器允许在需要的时候自动创建分区。默认false
    spring.cloud.stream.kafka.binder.autoAddPartitions=true

    消费者端如此配置以后,将表现为一个消费开发者_Js入门者服务或进程中,会有2个线程各自消费1个分区,即2个消费者线程同时消费。

    以下是该配置的效果验证步骤:

    消费者代码:

    1个 @StreamListener 消费自己的 topic 或自己的输出channel:

    @EnableBinding(SpiderSink.class)
    @Slf4j
    public class SpiderSinkReceiver {
     
        @Autowired
        private SpiderMessageService spiderMessageService;
     
        @StreamListener(SpiderSink.INPUT)
        public void receive(Object payload) {
            log.info("SPIDER-SINK received: {}", payload);
        }
    }

    方式一:通过日志验证:

    通过在 log4j 日志中,打印线程名称的方式,验证 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置确确实实会新增1个消费者线程。

    [INFO ] 2020-05-09 01:19:34,700 [thread: [LJava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
    [INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)

    方式二:直接查看分区数来验证:

    另外,也可在启动一个生产者服务时,等待自动创建一个新 topic 后(此时默认分区数为1),比如我们创建的 topic 为“topic-spider-dev”,此时通过kafka命令查看分区数,此时分区数为1:

    [root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh编程客栈 --zookeeper localhost:2181 --describe --topic topic-spider-dev
    Topic:topic-spider-dev PartitionCount:1    ReplicationFactor:1   Configs:
        Topic: topic-spider-dev Partition: 0  Leader: 1    Replicas: 1   Isr: 1

    然后,配置消费者服务的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,启动一个消费者服务,再次查看分区数,已经变为2了:

    [root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
    Topic:topic-spider-dev PartitionCount:2    ReplicationFactor:1   Configs:
        Topic: topic-spider-dev Partition: 0  Leader: 1    Replicas: 1   Isr: 1
        Topic: topic-spider-dev Partition: 1  Leader: 2    Replicas: 2   Isr: 2

    同时查编程客栈看消费者端的应用日志,看到2个消费者线程各自分配了一个分区:

    [INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
    partitions assigned: [topic-spider-dev-0]
    [INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
    partitions assigned: [topic-spider-dev-1]

    最终,确确实实地验证了 concurrency 配置对消费者线程数和分区数的影响。

    2.4 其他因素

    比如,SpringCloudStream 中 Kafka 绑定器的配置中,也有一个相关的影响因素:

    #最小分区数,默认1
    spring.cloud.stream.kafka.binder.minPartitionCount=n

    【说明】:该参数仅在设置了 autoCreateTopics 和 autoAddPartitions 时生效,用来设置该绑定器所使用主题的全局分区最小数量。

    如果当生产者的 partitionCount 参数或 instanceCount * concurrency 设置大于该参数配置时,该参数值将被覆盖。

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜