Kafka之kafka-topics.sh的使用解读
目录
- 一、kafka的基本操作
- 1.1、创建topic
- 1.2、查看topic
- 1.3、查看topic属性
- 1.4、发送消息
- 1.5、消费消息
- 二、kafka-topics.sh 使用方式
- 2.1、查看帮助
- 2.2、副本数量规则
- 2.3、创建主题
- 2.4、查看broker上所有的主题
- 2.5、查看指定主题 topic 的详细信息
- 2.6、修改主题信息之增加主题分区数量
- 2.7、删除主题
- 总结
一、kafka的基本操作
1.1、创建topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
参数说明:
–create
是创建主题的的动作指令。–zookeeper
指定kafka所连接的zookeeper服务地址。–replicator-factor
指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。–partitions
指定分区个数;多通道,类似车道。–topic
指定所要创建主题的名称,比如test。
成功则显示:
Created topic "test".
1.2、查看topic
sh kafka-topics.sh --list --zookeeper localhost:2181
显示:
test
1.3、查看topic属性
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
显示:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
1.4、发送消息
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
发送端输入:
>hello >where are you >let's go
1.5、消费消息
sh kafka-console-consumer.sh --boot编程客栈strap-server 127.0.0.1:9092 --topic test --from-beginning
消费端显示:
hello where are you let's go ^CProcessed a total of 3 messages
二、kafka-topics.sh 使用方式
创建、修改、删除以及查看等功能。
2.1、查看帮助
/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。
$ sh kafka-topics.sh --help Command must include exactly one action: --list, --describe, --create, --alter or --delete Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes http://www.devze.com segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics thatpython have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: hosts> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.
2.2、副本数量规则
副本数量不能大于broker的数量。
kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1开发者_C开发
报错:
Error while executing topic command : Replication factor: js2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1. (kafka.admin.TopicCommand$)
注意:副本数量和分区数量的区别。
2.3、创建主题
创建主题时候,有3个参数是必填的:
–partitions
(分区数量)、–topic
(主题名) 、–replication-factor
(复制系数),
同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
返回显示:
Created topic "test1".
另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。
2.4、查看broker上所有的主题
–list。
sh kafka-topics.sh --list --zookeeper localhost:2181
结果显示:
__consumer_offsets
testtest1
2.5、查看指定主题 topic 的详细信息
–describe。
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
结果显示:
Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2.6、修改主题信息之增加主题分区数量
–alter。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2
结果显示:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
查看主题信息:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
可以看到已经成功的将主题的分区数量从1修改为了2。
Topic:test1 PartitionCount:2 ReplicationFactor:1 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2
会报错:
Error while executing topic commanjsd : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR Java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181 at kafka.admin.TopicCommand$.alterTopic(TopicCommand.Scala:123) at kafka.admin.TopicCommand$.main(TopicCommand.scala:65) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)
注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。
2.7、删除主题
–delete。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
可以测试一些:
# 一个终端启动生产者: sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1 # 另一个终端启动消费者: sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning
发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。
然后就重启kafka:
# 停止: sh kafka-server-stop.sh -daemon ../config/server.properties # 启动: sh kafka-server-start.sh -daemon ../config/server.properties
再次删除就可以了。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。
精彩评论