主题操作
创建主题
使用以下命令创建一个叫作 my-topic 的主题,主题包含 8 个分区,每个分区拥有两个副本。
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
增加分区
将 my-topic 主题的分区数量增加到 16。
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
删除分区
删除 my-topic 主题
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete -- topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
列出集群里的所有主题
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
列出主题详细信息
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr
消费者群组
列出并描述群组
kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
删除群组
kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
偏移量管理
导出
将群组 testgroup 的偏移量导出到 offsets 文件里。
kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
导入
从 offsets 文件里将偏移量导入到消费者群组 testgroup。
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
动态配置变更
为了满足不同的使用场景,主题的很多参数都可以进行单独的设置。它们大部分都有 broker 级别的默认值,在没有被覆盖的情况下使用默认值。
更改主题配置的命令格式如下。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
可用的主题配置参数(键)如表 9-2 所示。
将主题 my-topic 的消息保留时间设为 1 个小时(3 600 000ms)。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
覆盖客户端的默认配置
更改客户端配置的命令格式如下:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
可用的客户端配置参数(键)如表 9-3 所示。
列出被覆盖的配置
列出主题 my-topic 所有被覆盖的配置。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe -- entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
移除被覆盖的配置
删除主题 my-topic 的 retention.ms 覆盖配置。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
分区管理
首选的首领选举
在一个包含了 1 个主题和 8 个分区的集群里启动首选的副本选举。
kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
修改分区副本
略
修改复制系数
转储日志片段
解码日志片段 00000000000052368601.log,显示消息的概要信息。
kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid:true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize:895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize:665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize:932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...
副本验证
对 broker 1 和 broker 2 上以 my- 开头的主题副本进行验证。
kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
消费和生产
控制台消费者
使用旧版消费者读取单个主题 my-topic。
kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster -- topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
控制台生产者
向主题 my-topic 生成两个消息。
kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
客户端ACL
命令行工具 kafka-acls.sh 可以用于处理与客户端访问控制相关的问题,它的文档可以在 Apache Kafka 官方网站上找到。
不安全的操作