kafka是通過zookeeper來管理叢集。 kafka軟體包内雖然包括了一個簡版的zookeeper,但是簡易版功能有限。在生産環境下,建議還是直接下載下傳官方zookeeper軟體。
http://zookeeper.apache.org/ http://mirror.bit.edu.cn/apache/zookeeper/建議使用穩定版,/stable/zookeeper-3.4.10.tar.gz
kafka使用版本是kafka_2.12-1.1.0
1、zookeeper環境部署
chmod -R 777 zookeeper-3.4.10
./zkServer.sh start #啟動zookeeper伺服器
./zkServer.sh status#檢視狀态
./zkCli.sh -server 127.0.0.1:2181 #啟動用戶端
[zk: 127.0.0.1:2181(CONNECTED) 0] ls / #檢視zookeeper目錄結構
[zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /brokers/ids #檢視已啟動的代理節點
Node does not exist: /brokers/ids
我們可以通過jps指令來檢視zk是否啟動
[root@bogon bin]# jps
3952 Jps
3921 QuorumPeerMain
2、zookeeper配置檔案(本人電腦有限,僅使用一台虛拟機來實作分布式叢集)
(1)把zookeeper檔案夾複制三份,我們僅修改\conf\zoo.cfg
檔案夾zookeeper-0
dataDir=/tmp/zookeeper/data00
clientPort=2181
server.0=172.16.6.170:20881:30881
server.1=172.16.6.170:20882:30882
server.2=172.16.6.170:20883:30883
檔案夾zookeeper-1
dataDir=/tmp/zookeeper/data01
clientPort=2182
檔案夾zookeeper-3
dataDir=/tmp/zookeeper/data02
clientPort=2183
server.2=172.16.6.170:20883:30883
(2)分别在檔案夾/tmp/zookeeper/data00,data01和data02下面建立myid檔案,檔案内容隻有一個數字,代表zookeeper節點的唯一id,即要確定此id在叢集内唯一,且要跟配置檔案中的server.0、server.1、server.2 對應上。
終端指令:
cd data00
echo 0 > myid
cd ../data01
echo 1 > myid
cd ../data02
echo 2 > myid
3、kafka broker配置檔案(本人電腦有限,僅使用一台虛拟機來實作分布式叢集)
我們僅僅修改server.properties檔案,其他檔案(例如producer.properties和consumer.properties)不動。
把\config\server.properties檔案複制2份,分别是server.properties,server_1.properties,server_2.properties,
檔案server.properties
broker.id=0
listeners=PLAINTEXT://172.16.6.170:9092 ##不要寫成PLAINTEXT://0.0.0.0:9092
port=9092
host.name=172.16.6.170
log.dirs=/tmp/kafka-logs-0
num.partitions=3
zookeeper.connect=172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183
檔案server_1.properties
broker.id=1
listeners=PLAINTEXT://172.16.6.170:9093
port=9093
log.dirs=/tmp/kafka-logs-1
檔案server_2.properties
broker.id=2
listeners=PLAINTEXT://172.16.6.170:9094
port=9094
log.dirs=/tmp/kafka-logs-2
4、幾點心得
(1)單純的kafka broker叢集沒有意義,一台當機照樣出錯,必須帶上zookeeper叢集一起。
(2)broker和zookeeper都至少要三台伺服器,奇數台。
(3)生産者,以下幾種方式,我個人實踐覺得沒有差別
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
./bin/kafka-console-producer.sh --broker-list 172.16.6.170:9092 --topic test
./bin/kafka-console-producer.sh --broker-list 172.16.6.170:9092,172.16.6.170:9093,172.16.6.170:9094 --topic test
(4)使用叢集時,生産者和消費者執行之前,使用者必須手動先建立topic,指定zookeeper節點清單;
(5)本人看了部落格,Kafka 單節點多Kafka Broker叢集
發現broker的server.properties可以不需要上述那麼複雜,簡單即可:
listeners=PLAINTEXT://:9092
省略的
這些參數是0.9之前的版本帶的參數,從1.0.0開始沒有這些參數了。
5、常用終端指令
檢視程序是否啟動
netstat -tunlp|egrep "(2181|2182|2183|9092|9093|9094)"
啟動zookeeper
./zkServer.sh start
啟動kafka broker
./bin/kafka-server-start.sh -daemon config/server.properties
./bin/kafka-server-start.sh -daemon config/server_1.properties
./bin/kafka-server-start.sh -daemon config/server_2.properties
建立主題
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --partitions 3 --topic test
./bin/kafka-topics.sh --describe --zookeeper 172.16.6.170:2181 --topic test
生産
./bin/kafka-console-producer.sh --broker-list 172.16.6.170:9092,172.16.6.170:9093,,172.16.6.170:9094 --topic test
消費
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
./bin/kafka-console-consumer.sh --zookeeper 172.16.6.170:2181 --topic test --from-beginning
./bin/kafka-console-consumer.sh --zookeeper 172.16.6.170:2182 --topic test --from-beginning
zookeeper用戶端連接配接
./zkCli.sh -server 127.0.0.1:2181
6、參數解釋
(1)server.properties檔案
broker.id=0 #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
port=19092 #目前kafka對外提供服務的端口預設是9092
host.name=192.168.7.100 #這個參數預設是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗号分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新建立的topic他把消息持久化的地方是,目前以逗号分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,資料不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當資料到達一定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #預設的分區數,一個topic預設1個分區數
log.retention.hours=168 #預設消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息儲存的最大值5M
default.replication.factor=2 #kafka儲存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到檔案,當超過這個值的時候,kafka會新起一個檔案
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄檢視是否有過期的消息如果有,删除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設定zookeeper的連接配接端口