一、我的伺服器環境:CentOS7,kafka依賴JVM環境,要求安裝有JDK
使用CentOS7系統,先删除自帶的JDK,自帶的不好用(例如jps終端指令不能用等)。下載下傳官網完整的JDK,并設定環境變量JAVA_HOME。詳情見:
https://blog.csdn.net/libaineu2004/article/details/80060812[root@localhost ~]# java -version
openjdk version "1.8.0_141"
OpenJDK Runtime Environment (build 1.8.0_141-b16)
OpenJDK 64-Bit Server VM (build 25.141-b16, mixed mode)
二、下載下傳
Kafka(
http://kafka.apache.org/ )依賴ZooKeeper http://mirrors.shuosc.org/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz --- 穩定版 http://mirrors.shuosc.org/apache/kafka/1.1.0/kafka-1.1.0-src.tgz http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz由于kafka_2.12-1.1.0.tgz是可執行程式,裡面內建了zookeeper簡易版,如果隻是單機測試那麼僅僅下載下傳這個就可以了。但是簡易版功能有限。在生産環境下,建議還是直接下載下傳官方zookeeper軟體。
三、運作與測試,以下都是終端指令
1、cd kafka_2.12-1.1.0
2、啟動Zookeeper,如果無報錯則說明啟動成功。daemon以及nohup &是實作在背景啟動,即守護程序。
方法1(推薦) ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
方法2 nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties&
3、啟動Kafka,如果無報錯則說明啟動成功。daemon以及nohup &是實作在背景啟動,即守護程序。
方法1 (推薦)./bin/kafka-server-start.sh -daemon config/server.properties
方法2 nohup ./bin/kafka-server-start.sh config/server.properties&
4、檢視程序是否正常運作,通過檢測2181與9092端口,2181是zookeeper,9092是kafka
netstat -tunlp|egrep "(2181|9092)"
tcp 0 0 :::2181 :::* LISTEN 19787/java
tcp 0 0 :::9092 :::* LISTEN 28094/java
我們也可以使用jps終端指令檢視是否啟動:
[root@bogon kafka_2.12-1.1.0]# jps
4384 Kafka
3921 QuorumPeerMain
4412 Jps
5、建立主題,例如test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --partitions 3 --topic emqtest
--zookeeper : zookeeper叢集清單,用英文逗号分隔。可以不用指定zookeeper整個叢集内的節點清單,隻指定某個或某幾個zookeeper節點清單也是可以的
replication-factor : 複制數目,提供failover機制;1代表隻在一個broker上有資料記錄,一般值都大于1,代表一份資料會自動同步到其他的多個broker,防止某個broker當機後資料丢失。
partitions : 一個topic可以被切分成多個partitions,一個消費者可以消費多個partitions,但一個partitions隻能被一個消費者消費,是以增加partitions可以增加消費者的吞吐量。kafka隻保證一個partitions内的消息是有序的,多個一個partitions之間的資料是無序的。
注意,
(1)使用叢集時,必須手動先建立topic,指定zookeeper節點清單;單機也應該要建立。否則用戶端會報錯。
(2)主題名不建議使用"."和"_"字元。
[root@localhost kafka_2.12-1.0.0copy1]# ./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --
partitions 3 --topic emq_broker_message1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "emq_broker_message1".
partitions 3 --topic emqtest
Created topic "emqtest".
(3)建議建立主題時,指定分區數為kafka broker節點數目的整數倍,例如broker有3台,那麼分區就設定為3個或6個或9個--partitions 3。建立完主題,可以在路徑log.dirs看到主題和分區的檔案夾。詳情參見《Kafka入門與實踐.牟大恩》3.6.1節描述
(4)--replication-factor 3,《kafka權威指南》6.3.1節建議複制系數為3.但是前提是數值不能大于broker的節點數。
如果broker節點數為1,複制系數設定為3,則會報錯:
Error while executing topic command : Replication factor: 3 larger than available brokers: 1.
(5)單機時,zookeeper數目是1;叢集時,zookeeper的數目建議為奇數個,至少3個。
(6)server.properties檔案設定預設值
num.partitions=3
default.replication.factor=1
6、檢視主題
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
./bin/kafka-topics.sh --list --zookeeper localhost:2181
例如:broker叢集,有3個節點。同時給主題mynewt配置設定了3個分區。每個分區都有leader和follow。
[root@bogon kafka_2.12-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mynewt
Topic:mynewt PartitionCount:3 ReplicationFactor:3 Configs:
Topic: mynewt Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: mynewt Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mynewt Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
7、開啟一個終端,發送消息,生産者的消息要發往kafka
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
8、另起一個終端,消費消息,消費者的消息來自zookeeper(協調轉發)
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
将來新版本需要寫成./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning
9、結束程序
./bin/kafka-server-stop.sh
./bin/zookeeper-server-stop.sh
kafka啟動時先啟動zookeeper,再啟動kafka;關閉時相反,先關閉kafka,再關閉zookeeper。
四、總結
1、kafka有什麼?
producer 消息的生成者,即釋出消息
consumer 消息的消費者,即訂閱消息
broker Kafka以叢集的方式運作,可以由一個或多個服務組成,服務即broker
zookeeper 協調轉發
2、配置檔案與消息持久化路徑
/config/server.properties
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
例如用戶端發送test為主題的消息,會持久化在這個檔案:/tmp/kafka-logs/test-0/00000000000000000000.log
主題清單會記錄在/tmp/zookeeper
/config/zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
建議修改kafka的日志目錄和zookeeper資料目錄,因為這兩項預設放在tmp目錄,而tmp目錄中内容會随重新開機而丢失:
server.properties:
log.dirs=/tmp/kafka-logs
修改為
log.dirs=/usr/local/kafka/logs
zookeeper.properties
dataDir=/tmp/zookeeper
dataDir=/usr/local/zookeeper/data
3、配置檔案bootstrap.servers 和 broker.list差別:
本以為是兩個參數,其實是實作一個功能,檢視源代碼後發現broker.list是舊版本指令
4、自己寫的shell啟動和關閉kafka腳本,kafka.sh -- 這個腳本有問題,不穩定
#!/bin/bash
kafka_home=/root/Downloads/kafka_2.12-1.0.0/
case $1 in
start) # 服務啟動需要做的步驟
echo "zookeeper start"
$kafka_home/bin/zookeeper-server-start.sh -daemon $kafka_home/config/zookeeper.properties
#sleep 1
echo "kafka start"
$kafka_home/bin/kafka-server-start.sh -daemon $kafka_home/config/server.properties
#sleep 1
;;
stop) # 服務停止需要做的步驟
echo "kafka stop"
$kafka_home/bin/kafka-server-stop.sh
#sleep 1
echo "zookeeper stop"
$kafka_home/bin/zookeeper-server-stop.sh
#sleep 1
;;
restart) # 重新開機服務需要做的步驟
...
;;
status) # 檢視狀态需要做的步驟
...
;;
*) echo "$0 {start|stop|restart|status}"
exit 4
;;
esac
5、zookeeper叢集
kafka是通過zookeeper來管理叢集。 kafka軟體包内雖然包括了一個簡版的zookeeper,但是感覺功能有限。在生産環境下,建議還是直接下載下傳官方zookeeper軟體。 下載下傳最新版的zookeeper軟體:
http://zookeeper.apache.org/ http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.11.tar.gz
歡迎通路姊妹篇:我個人的kafka broker和zookeeper叢集實踐(★firecat推薦★)
---
參考文獻
物聯網架構成長之路(8)-EMQ-Hook了解、連接配接Kafka發送消息
centos7 安裝 kafka_2.11-1.0.0
zookeeper安裝及部署
Kafka安裝及部署
CentOS下ZooKeeper單機模式、叢集模式安裝
Kafka單機、叢集模式安裝詳解(一)
kafka叢集監控之KafkaOffsetMonitor 0.4.1版本 以及用sbt編譯KafkaOffsetMonitor
https://github.com/quantifind/KafkaOffsetMonitor