主題操作
建立主題
使用以下指令建立一個叫作 my-topic 的主題,主題包含 8 個分區,每個分區擁有兩個副本。
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
增加分區
将 my-topic 主題的分區數量增加到 16。
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
删除分區
删除 my-topic 主題
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete -- topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
列出叢集裡的所有主題
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
列出主題詳細資訊
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr
消費者群組
列出并描述群組
kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
删除群組
kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
偏移量管理
導出
将群組 testgroup 的偏移量導出到 offsets 檔案裡。
kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
導入
從 offsets 檔案裡将偏移量導入到消費者群組 testgroup。
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
動态配置變更
為了滿足不同的使用場景,主題的很多參數都可以進行單獨的設定。它們大部分都有 broker 級别的預設值,在沒有被覆寫的情況下使用預設值。
更改主題配置的指令格式如下。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
可用的主題配置參數(鍵)如表 9-2 所示。
将主題 my-topic 的消息保留時間設為 1 個小時(3 600 000ms)。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
覆寫用戶端的預設配置
更改用戶端配置的指令格式如下:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
可用的用戶端配置參數(鍵)如表 9-3 所示。
列出被覆寫的配置
列出主題 my-topic 所有被覆寫的配置。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe -- entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
移除被覆寫的配置
删除主題 my-topic 的 retention.ms 覆寫配置。
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
分區管理
首選的首領選舉
在一個包含了 1 個主題和 8 個分區的叢集裡啟動首選的副本選舉。
kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
修改分區副本
略
修改複制系數
轉儲日志片段
解碼日志片段 00000000000052368601.log,顯示消息的概要資訊。
kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid:true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize:895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize:665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize:932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...
副本驗證
對 broker 1 和 broker 2 上以 my- 開頭的主題副本進行驗證。
kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
消費和生産
控制台消費者
使用舊版消費者讀取單個主題 my-topic。
kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster -- topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
控制台生産者
向主題 my-topic 生成兩個消息。
kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
用戶端ACL
指令行工具 kafka-acls.sh 可以用于處理與用戶端通路控制相關的問題,它的文檔可以在 Apache Kafka 官方網站上找到。
不安全的操作