1. 下載下傳代碼
下載下傳 2.3.0 版本并解壓縮:
tar -zxvf kafka_2.12-2.3.0.tgz -C .
複制
建立軟連接配接便于更新:
ln -s kafka_2.12-2.3.0/ kafka
複制
配置環境變量:
export KAFKA_HOME=/Users/smartsi/opt/kafka
export PATH=${KAFKA_HOME}/bin:$PATH
複制
2. 安裝ZooKeeper
Kafka 依賴 ZooKeeper,如果你還沒有 ZooKeeper 伺服器,你需要先啟動一個 ZooKeeper 伺服器。可以先參考ZooKeeper 安裝與啟動來安裝 ZooKeeper。ZooKeeper 配置如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/smartsi/opt/zookeeper/data
clientPort=2181
server.1=localhost:2888:3888
複制
你也可以通過與 kafka 打包在一起的便捷腳本來快速簡單地建立一個單節點 ZooKeeper 執行個體:
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
複制
3. 配置Kafka
第一個 broker 配置
server-9092.properties
如下:
broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000
複制
運作起來至少要配置四項。上面的前四項。
第二個 broker 配置
server-9093.properties
如下:
broker.id=1
listeners=PLAINTEXT://127.0.0.1:9093
log.dirs=/Users/smartsi/opt/kafka/logs/log-9093
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000
複制
第三個 broker 配置
server-9094.properties
如下:
broker.id=2
listeners=PLAINTEXT://127.0.0.1:9094
log.dirs=/Users/smartsi/opt/kafka/logs/log-9094
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000
複制
我們必須重寫端口和日志目錄,因為我們在同一台機器上運作這些,我們不希望所有都在同一個端口注冊,或者覆寫彼此的資料。是以用端口号9092、9093、9094分别代表三個 broker。
下面具體解釋一下我們的配置項:
(1) Broker相關:
broker.id=0
複制
broker 的 Id。每一個 broker 在叢集中的唯一标示,要求是正數。每個 broker 都不相同。
(2) Socket服務設定:
listeners=PLAINTEXT://127.0.0.1:9092
複制
Socket伺服器監聽的位址,如果沒有設定,則監聽
java.net.InetAddress.getCanonicalHostName()
傳回的位址。
(3) ZooKeeper相關:
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000
複制
zookeeper.connect
是一個逗号分隔的
host:port
鍵值對,每個對應一個 zk 伺服器。例如
127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
。你還可以将可選的用戶端命名空間 Chroot 字元串追加到 URL 上以指定所有 kafka 的 Znode 的根目錄。另外這個
kafka-2.3.0
這個節點需要你提前建立。讓 Kafka 把他需要的資料結構都建立在這個節點下,否則會建立在根節點
/
節點下。
(3) 日志相關:
log.dirs=/Users/smartsi/opt/kafka/logs/log-9092
複制
Kafka存儲Log的目錄。
4. 啟動Kafka伺服器
有兩種方式可以啟動 Kafka 伺服器:
# 第一種方式(推薦)
bin/kafka-server-start.sh -daemon config/server.properties
# 第二種方式
nohup bin/kafka-server-start.sh config/server.properties &
複制
我們以第一種方式啟動 Kafka 伺服器:
bin/kafka-server-start.sh -daemon config/server-9092.properties
bin/kafka-server-start.sh -daemon config/server-9093.properties
bin/kafka-server-start.sh -daemon config/server-9094.properties
複制
檢視程序和端口:
smartsi:kafka smartsi$ jps
8914 DataNode
42802 Jps
9252 NodeManager
41253 Kafka
41541 Kafka
42790 Kafka
41670 ZooKeeperMain
16731
9164 ResourceManager
1997
複制
我們現在看一下 Kafka 在 ZooKeeper 上建立的節點:
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
複制
看一下我們在ZooKeeper上注冊的兩個 broker:
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids
[0, 1, 2]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4}
cZxid = 0x92
ctime = Mon Sep 02 10:08:41 CST 2019
mZxid = 0x92
mtime = Mon Sep 02 10:08:41 CST 2019
pZxid = 0x92
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560012
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4}
cZxid = 0xa7
ctime = Mon Sep 02 10:08:48 CST 2019
mZxid = 0xa7
mtime = Mon Sep 02 10:08:48 CST 2019
pZxid = 0xa7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560014
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4}
cZxid = 0xbd
ctime = Mon Sep 02 10:19:09 CST 2019
mZxid = 0xbd
mtime = Mon Sep 02 10:19:09 CST 2019
pZxid = 0xbd
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560018
dataLength = 188
numChildren = 0
複制
5. 測試Kafka
5.1 建立Topic
讓我們建立一個名為
test
的 Topic,它有一個分區和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test
複制
現在我們可以運作
list
指令來檢視這個 Topic:
smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0
test
複制
或者,你也可将代理配置為:在釋出的topic不存在時,自動建立topic,而不是手動建立。
5.2 啟動生産者
Kafka 自帶一個指令行用戶端,它從檔案或标準輸入中擷取輸入,并将其作為消息發送到 Kafka 叢集。預設情況下,每行将作為單獨的消息發送。
運作 Producer (生産者),然後在控制台輸入一些消息以發送到伺服器:
smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is my first message
>this is my second message
複制
5.3 啟動消費者
Kafka 還有一個指令行 Consumer(消費者),将消息轉儲到标準輸出:
smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is my first message
this is my second message
複制
如果你将上述指令在不同的終端中運作,那麼現在就可以将消息輸入到生産者終端中,并将它們在消費終端中顯示出來。
原文:Quickstart