天天看點

Kafka入門-基礎操作指令-常用指令

主題操作

建立主題

使用以下指令建立一個叫作 my-topic 的主題,主題包含 8 個分區,每個分區擁有兩個副本。

kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
           

增加分區

将 my-topic 主題的分區數量增加到 16。

kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
           

删除分區

删除 my-topic 主題

kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete -- topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
           

列出叢集裡的所有主題

kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
           

列出主題詳細資訊

kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic       PartitionCount:8        ReplicationFactor:2  Configs:
Topic:other-topic       Partition: 0      ...   Replicas: 1,0        Isr: 1,0
Topic:other-topic       Partition: 1      ...   Replicas: 0,1        Isr: 0,1
Topic:other-topic       Partition: 2      ...   Replicas: 1,0        Isr: 1,0
Topic:other-topic       Partition: 3      ...   Replicas: 0,1        Isr: 0,1
Topic:other-topic       Partition: 4      ...   Replicas: 1,0        Isr: 1,0
Topic:other-topic       Partition: 5      ...   Replicas: 0,1        Isr: 0,1
Topic:other-topic       Partition: 6      ...   Replicas: 1,0        Isr: 1,0
Topic:other-topic       Partition: 7      ...   Replicas: 0,1        Isr
           

消費者群組

列出并描述群組

kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
           

删除群組

kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
           

偏移量管理

導出

将群組 testgroup 的偏移量導出到 offsets 檔案裡。

kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
           

導入

從 offsets 檔案裡将偏移量導入到消費者群組 testgroup。

kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
           

動态配置變更

為了滿足不同的使用場景,主題的很多參數都可以進行單獨的設定。它們大部分都有 broker 級别的預設值,在沒有被覆寫的情況下使用預設值。

更改主題配置的指令格式如下。

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
           

可用的主題配置參數(鍵)如表 9-2 所示。

Kafka入門-基礎操作指令-常用指令
Kafka入門-基礎操作指令-常用指令
Kafka入門-基礎操作指令-常用指令
Kafka入門-基礎操作指令-常用指令

将主題 my-topic 的消息保留時間設為 1 個小時(3 600 000ms)。

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
           

覆寫用戶端的預設配置

更改用戶端配置的指令格式如下:

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
           

可用的用戶端配置參數(鍵)如表 9-3 所示。

Kafka入門-基礎操作指令-常用指令

列出被覆寫的配置

列出主題 my-topic 所有被覆寫的配置。

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe -- entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
           

移除被覆寫的配置

删除主題 my-topic 的 retention.ms 覆寫配置。

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
           

分區管理

首選的首領選舉

在一個包含了 1 個主題和 8 個分區的叢集裡啟動首選的副本選舉。

kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
           

修改分區副本

修改複制系數

轉儲日志片段

解碼日志片段 00000000000052368601.log,顯示消息的概要資訊。

kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid:true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize:895  magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize:665  magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize:932  magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...
           

副本驗證

對 broker 1 和 broker 2 上以 my- 開頭的主題副本進行驗證。

kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
           

消費和生産

控制台消費者

使用舊版消費者讀取單個主題 my-topic。

kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster -- topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
           

控制台生産者

向主題 my-topic 生成兩個消息。

kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
           

用戶端ACL

指令行工具 kafka-acls.sh 可以用于處理與用戶端通路控制相關的問題,它的文檔可以在 Apache Kafka 官方網站上找到。

不安全的操作