天天看點

Kafka用戶端Producer與Consumer

一、pom.xml

二、相關配置檔案

producer.properties

log4j.properties

base.properties

三、producer用戶端

在叢集上啟動zookeeper

zkserver.sh start

檢視zookeeper的狀态

zkserver.sh status

啟動kafka叢集:

kafka-server-start.sh config/server.properties &

建立新的topic

kafka-topics.sh --create --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic data_server

檢視topic副本資訊

kafka-topics.sh --describe alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic data_server

檢視已經建立的topic資訊

kafka-topics.sh --list --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0

測試生産者發送消息

bin/kafka-console-producer.sh --broker-list alary001:9092,alary002:9092,alary003:9092 --topic data_server

測試消費者消費消息

kafka-console-consumer.sh --bootstrap-server alary001:9092,alary002:9092,alary003:9092 --from-beginning --topic data_server

删除topic

bin/kafka-topics.sh --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --delete --topic data_server 需要server.properties中設定delete.topic.enable=true否則隻是标記删除或者直接重新開機。

停止kafka服務

kafka-server-stop.sh stop

停止zookeeper叢集

zkserver.sh stop