开发者

Kafka之kafka-topics.sh的使用解读

目录
  • 一、kafka的基本操作
    • 1.1、创建topic
    • 1.2、查看topic
    • 1.3、查看topic属性
    • 1.4、发送消息
    • 1.5、消费消息
  • 二、kafka-topics.sh 使用方式
    • 2.1、查看帮助
    • 2.2、副本数量规则
    • 2.3、创建主题
    • 2.4、查看broker上所有的主题
    • 2.5、查看指定主题 topic 的详细信息
    • 2.6、修改主题信息之增加主题分区数量
    • 2.7、删除主题
  • 总结

    一、kafka的基本操作

    1.1、创建topic

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    参数说明:

    • –create 是创建主题的的动作指令。
    • –zookeeper 指定kafka所连接的zookeeper服务地址。
    • –replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
    • –partitions 指定分区个数;多通道,类似车道。
    • –topic 指定所要创建主题的名称,比如test。

    成功则显示:

    Created topic "test".

    1.2、查看topic

    sh kafka-topics.sh --list --zookeeper localhost:2181

    显示:

    test

    1.3、查看topic属性

    sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

    显示:

    Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
     Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

    1.4、发送消息

    sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

    发送端输入:

    >hello
    >where are you
    >let's go

    1.5、消费消息

    sh kafka-console-consumer.sh --boot编程客栈strap-server 127.0.0.1:9092 --topic test
    --from-beginning

    消费端显示:

    hello
    where are you
    let's go
    ^CProcessed a total of 3 messages

    二、kafka-topics.sh 使用方式

    创建、修改、删除以及查看等功能。

    2.1、查看帮助

    /bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

    $ sh kafka-topics.sh --help
    Command must include exactly one action: --list, --describe, --create, --alter or --delete
    Option                  Description              
    ------                  -----------              
    --alter                 Alter the number of partitions,    
                         replica assignment, and/or     
                         configuration for the topic.    
    --config <String: name=value>      A topic configuration override for the
                         topic being created or altered.The 
                         following is a list of valid    
                         configurations:           
                         cleanup.policy            
                         compression.type           
                         delete.retention.ms         
                         file.delete.delay.ms         
                         flush.messages            
                         flush.ms               
                         follower.replication.throttled.   
                         replicas              
                         index.interval.bytes         
                         leader.replication.throttled.replicas
                         max.message.bytes          
                         message.downconversion.enable    
                         message.format.version        
                         message.timestamp.difference.max.ms 
                         message.timestamp.type        
                         min.cleanable.dirty.ratio      
                         min.compaction.lag.ms        
                         min.insync.replicas         
                         preallocate             
                         retention.bytes           
                         retention.ms             
                         segment.bytes            
                         segment.index.bytes         
        http://www.devze.com                 segment.jitter.ms          
                         segment.ms              
                         unclean.leader.election.enable    
                        See the Kafka documentation for full 
                         details on the topic configs.    
    --create                 Create a new topic.          
    --delete                 Delete a topic            
    --delete-config <String: name>      A topic configuration override to be 
                         removed for an existing topic (see 
                         the list of configurations under the
                         --config option).          
    --describe                List details for the given topics.  
    --disable-rack-aware           Disable rack aware replica assignment 
    --force                 Suppress console prompts       
    --help                  Print usage information.       
    --if-exists               if set when altering or deleting   
                         topics, the action will only execute
                         if the topic exists         
    --if-not-exists             if set when creating topics, the   
                         action will only execute if the   
                         topic does not already exist    
    --list                  List all available topics.      
    --partitions <Integer: # of partitions> The number of partitions for the topic
                         being created or altered (WARNING: 
                         If partitions are increased for a  
                         topic that has a key, the partition 
                         logic or ordering of the messages  
                         will be affected          
    --replica-assignment <String:      A list of manual partition-to-broker 
     broker_id_for_part1_replica1 :      assignments for the topic being   
     broker_id_for_part1_replica2 ,      created or altered.         
     broker_id_for_part2_replica1 :                        
     broker_id_for_part2_replica2 , ...>                     
    --replication-factor <Integer:      The replication factor for each    
     replication factor>           partition in the topic being created.
    --topic <String: topic>         The topic to be create, alter or   
                         describe. Can also accept a regular 
                         expression except for --create option
    --topics-with-overrides         if set when describing topics, only  
                         show topics thatpython have overridden  
                         configs               
    --unavailable-partitions         if set when describing topics, only  
                         show partitions whose leader is not 
                         available              
    --under-replicated-partitions      if set when describing topics, only  
                         show under replicated partitions  
    --zookeeper <String: hosts>       REQUIRED: The connection string for  
                         the zookeeper connection in the form
                         host:port. Multiple hosts can be  
                         given to allow fail-over.     

    2.2、副本数量规则

    副本数量不能大于broker的数量。

    kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1
    开发者_C开发

    报错:

    Error while executing topic command : Replication factor: js2 larger than available brokers: 1.

    [2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

     (kafka.admin.TopicCommand$)

    注意:副本数量和分区数量的区别。

    2.3、创建主题

    创建主题时候,有3个参数是必填的:

    • –partitions(分区数量)、
    • –topic(主题名) 、
    • –replication-factor(复制系数),

    同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

    返回显示:

    Created topic "test1".

    另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

    2.4、查看broker上所有的主题

    –list。

    sh kafka-topics.sh --list --zookeeper localhost:2181

    结果显示:

    __consumer_offsets

    test

    test1

    2.5、查看指定主题 topic 的详细信息

    –describe。

    sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

    结果显示:

    Topic:test1    PartitionCount:1    ReplicationFactor:1    Configs:

        Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

    2.6、修改主题信息之增加主题分区数量

    –alter。

    sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

    结果显示:

    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

    Adding partitions succeeded!

    查看主题信息:

    sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

    可以看到已经成功的将主题的分区数量从1修改为了2。

    Topic:test1 PartitionCount:2 ReplicationFactor:1 Configs:
     Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
     Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0

    当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

    sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

    会报错:

    Error while executing topic commanjsd : Topic test2 does not exist on ZK path localhost:2181

    [2022-11-24 14:21:33,564] ERROR Java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181

        at kafka.admin.TopicCommand$.alterTopic(TopicCommand.Scala:123)

        at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)

        at kafka.admin.TopicCommand.main(TopicCommand.scala)

     (kafka.admin.TopicCommand$)

    注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

    2.7、删除主题

    –delete。

    sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

    日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

    Topic test1 is marked for deletion.

    Note: This will have no impact if delete.topic.enable is not set to true.

    可以测试一些:

    # 一个终端启动生产者:
    sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1
    
    # 另一个终端启动消费者:
    sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

    发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

    然后就重启kafka:

    # 停止:
    sh kafka-server-stop.sh -daemon ../config/server.properties
    # 启动:
    sh kafka-server-start.sh -daemon ../config/server.properties

    再次删除就可以了。

    sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜