Kafka群集中的每個主機都運作一個稱為代理的伺服器,該伺服器存儲發送到主題的消息并服務于消費者請求。
首先先看伺服器安裝kafka的執行個體資訊:
注意:
然後正常kafka的指令是 : ./bin/kafka-topics.sh --zookeeper cluster2-4:2181 .......
但是使用CDH安裝的kafka則不需要全寫出此 ./bin/kafka-topics.sh 部分。隻許直接寫 kafka-topics 即可,這是很重要的一個差別,使用CDH安裝的kafka時候要特别注意一下。
具體有哪些指令可以看此路徑下:
接下來測試topic指令,這裡我們要先看CDH中配置的這個ZooKeeper Root的Kafka服務範圍為: " /kafka "。
是以我們使用topic的指令格式應該都類似:
kafka-topics --zookeeper cluster2-4:2181/kafka ......
A.建立一個名為 test 的主題(Topic):
Or
若是上述中的 ZooKeeper Root 的Kafka服務範圍為: " / "。則這裡的建立主題指令改為:
B.查詢現在已經存在的topic:
C.删除建立的topic:
拓展:
這裡如果直接删除,則會輸出 Topic *** is marked for deletion 如上圖,如果我們topic中消息堆積的太多,或者kafka所在磁盤空間滿了等等,則會需要徹底清理一下kafka topic。
方法一:修改kafaka配置檔案server.properties, 添加 delete.topic.enable=true,重新開機kafka,之後通過kafka指令行就可以直接删除topic。
方法二:通過指令行删除topic: ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}
因為kafaka配置檔案中server.properties沒有配置delete.topic.enable=true,此時的删除并不是真正的删除,隻是把topic标記為:marked for deletion 你可以通過指令:./bin/kafka-topics --zookeeper {zookeeper server} --list 來檢視所有topic
方法三:若需要真正删除它,需要登入zookeeper用戶端:
找到topic所在的目錄:
執行指令,即可,此時topic被徹底删除:
D.修改topic的分區數:
F.我們還可以在這裡測試分布式是否連接配接正常:
可以看到在2-4這台伺服器中,我們後面輸入 cluster2-4:2181/kafka 與 cluster2-3:2181/kafka 均可得到統一的資訊。
topic指令參數:
三、測試producer産生資料、consumer消費資料
之前我們建立好topic以後,這裡測試一下如何使用kafka中的kafka-console-producer與kafka-console-consumer來生産資料、另一端消費資料。
還需先了解這裡 釋出-訂閱系統中的代理結構:
producer産生資料到Topic中,然後consumer從要消費的Topic中消費資料。
首先啟動producer:
在這裡輸入資料,這些資料會上傳到zookeeper中的 /kafka/broker/test 主題中。
kafka-console-producer 生産者的指令參數:
接着啟動消費者:
後面的 --from-beginning 表示從指定主題中有效的起始位移位置開始消費所有分區的消息。
消費者消費到topic的資料:
kafka-console-consumer 消費者的指令參數:
檢視消費資料後的偏移量 kafka-run-class
擷取topic消費組的偏移量