天天看點

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

Kafka群集中的每個主機都運作一個稱為代理的伺服器,該伺服器存儲發送到主題的消息并服務于消費者請求。

首先先看伺服器安裝kafka的執行個體資訊:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

注意:

然後正常kafka的指令是 :  ./bin/kafka-topics.sh --zookeeper cluster2-4:2181 .......

但是使用CDH安裝的kafka則不需要全寫出此  ./bin/kafka-topics.sh  部分。隻許直接寫 kafka-topics 即可,這是很重要的一個差別,使用CDH安裝的kafka時候要特别注意一下。

具體有哪些指令可以看此路徑下:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

接下來測試topic指令,這裡我們要先看CDH中配置的這個ZooKeeper Root的Kafka服務範圍為: " /kafka "。

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

是以我們使用topic的指令格式應該都類似:

kafka-topics --zookeeper cluster2-4:2181/kafka ......

A.建立一個名為 test 的主題(Topic):

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

Or

若是上述中的 ZooKeeper Root 的Kafka服務範圍為: " / "。則這裡的建立主題指令改為:

B.查詢現在已經存在的topic:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 C.删除建立的topic:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 拓展:

這裡如果直接删除,則會輸出    Topic *** is marked for deletion   如上圖,如果我們topic中消息堆積的太多,或者kafka所在磁盤空間滿了等等,則會需要徹底清理一下kafka topic。

方法一:修改kafaka配置檔案server.properties, 添加 delete.topic.enable=true,重新開機kafka,之後通過kafka指令行就可以直接删除topic。

方法二:通過指令行删除topic:  ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}

  因為kafaka配置檔案中server.properties沒有配置delete.topic.enable=true,此時的删除并不是真正的删除,隻是把topic标記為:marked for deletion     你可以通過指令:./bin/kafka-topics --zookeeper {zookeeper server} --list 來檢視所有topic

方法三:若需要真正删除它,需要登入zookeeper用戶端:

找到topic所在的目錄:

執行指令,即可,此時topic被徹底删除:

D.修改topic的分區數:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 F.我們還可以在這裡測試分布式是否連接配接正常:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

可以看到在2-4這台伺服器中,我們後面輸入  cluster2-4:2181/kafka  與  cluster2-3:2181/kafka  均可得到統一的資訊。

topic指令參數:

三、測試producer産生資料、consumer消費資料

之前我們建立好topic以後,這裡測試一下如何使用kafka中的kafka-console-producer與kafka-console-consumer來生産資料、另一端消費資料。

還需先了解這裡  釋出-訂閱系統中的代理結構:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

producer産生資料到Topic中,然後consumer從要消費的Topic中消費資料。

首先啟動producer:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 在這裡輸入資料,這些資料會上傳到zookeeper中的 /kafka/broker/test 主題中。

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 kafka-console-producer 生産者的指令參數:

接着啟動消費者:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

後面的   --from-beginning   表示從指定主題中有效的起始位移位置開始消費所有分區的消息。

消費者消費到topic的資料:

CDH中Kafka基本指令總結——topic使用與測試producer産生資料、consumer消費資料

 kafka-console-consumer 消費者的指令參數:

 檢視消費資料後的偏移量  kafka-run-class

擷取topic消費組的偏移量

上一篇: C# 環境
下一篇: Serv-U 提權