天天看點

Kafka 安裝與啟動

1. 下載下傳代碼

下載下傳 2.3.0 版本并解壓縮:

tar -zxvf kafka_2.12-2.3.0.tgz -C .           

複制

建立軟連接配接便于更新:

ln -s kafka_2.12-2.3.0/ kafka           

複制

配置環境變量:

export KAFKA_HOME=/Users/smartsi/opt/kafka
export PATH=${KAFKA_HOME}/bin:$PATH           

複制

2. 安裝ZooKeeper

Kafka 依賴 ZooKeeper,如果你還沒有 ZooKeeper 伺服器,你需要先啟動一個 ZooKeeper 伺服器。可以先參考ZooKeeper 安裝與啟動來安裝 ZooKeeper。ZooKeeper 配置如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/smartsi/opt/zookeeper/data
clientPort=2181
server.1=localhost:2888:3888           

複制

你也可以通過與 kafka 打包在一起的便捷腳本來快速簡單地建立一個單節點 ZooKeeper 執行個體:

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...           

複制

3. 配置Kafka

第一個 broker 配置

server-9092.properties

如下:

broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000           

複制

運作起來至少要配置四項。上面的前四項。

第二個 broker 配置

server-9093.properties

如下:

broker.id=1
listeners=PLAINTEXT://127.0.0.1:9093
log.dirs=/Users/smartsi/opt/kafka/logs/log-9093
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000           

複制

第三個 broker 配置

server-9094.properties

如下:

broker.id=2
listeners=PLAINTEXT://127.0.0.1:9094
log.dirs=/Users/smartsi/opt/kafka/logs/log-9094
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000           

複制

我們必須重寫端口和日志目錄,因為我們在同一台機器上運作這些,我們不希望所有都在同一個端口注冊,或者覆寫彼此的資料。是以用端口号9092、9093、9094分别代表三個 broker。

下面具體解釋一下我們的配置項:

(1) Broker相關:

broker.id=0           

複制

broker 的 Id。每一個 broker 在叢集中的唯一标示,要求是正數。每個 broker 都不相同。

(2) Socket服務設定:

listeners=PLAINTEXT://127.0.0.1:9092           

複制

Socket伺服器監聽的位址,如果沒有設定,則監聽

java.net.InetAddress.getCanonicalHostName()

傳回的位址。

(3) ZooKeeper相關:

zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000           

複制

zookeeper.connect

是一個逗号分隔的

host:port

鍵值對,每個對應一個 zk 伺服器。例如

127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

。你還可以将可選的用戶端命名空間 Chroot 字元串追加到 URL 上以指定所有 kafka 的 Znode 的根目錄。另外這個

kafka-2.3.0

這個節點需要你提前建立。讓 Kafka 把他需要的資料結構都建立在這個節點下,否則會建立在根節點

/

節點下。

(3) 日志相關:

log.dirs=/Users/smartsi/opt/kafka/logs/log-9092           

複制

Kafka存儲Log的目錄。

4. 啟動Kafka伺服器

有兩種方式可以啟動 Kafka 伺服器:

# 第一種方式(推薦)
bin/kafka-server-start.sh -daemon config/server.properties
# 第二種方式
nohup bin/kafka-server-start.sh config/server.properties &           

複制

我們以第一種方式啟動 Kafka 伺服器:

bin/kafka-server-start.sh -daemon config/server-9092.properties
bin/kafka-server-start.sh -daemon config/server-9093.properties
bin/kafka-server-start.sh -daemon config/server-9094.properties           

複制

檢視程序和端口:

smartsi:kafka smartsi$ jps
8914 DataNode
42802 Jps
9252 NodeManager
41253 Kafka
41541 Kafka
42790 Kafka
41670 ZooKeeperMain
16731
9164 ResourceManager
1997           

複制

我們現在看一下 Kafka 在 ZooKeeper 上建立的節點:

[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]           

複制

看一下我們在ZooKeeper上注冊的兩個 broker:

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids
[0, 1, 2]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4}
cZxid = 0x92
ctime = Mon Sep 02 10:08:41 CST 2019
mZxid = 0x92
mtime = Mon Sep 02 10:08:41 CST 2019
pZxid = 0x92
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560012
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4}
cZxid = 0xa7
ctime = Mon Sep 02 10:08:48 CST 2019
mZxid = 0xa7
mtime = Mon Sep 02 10:08:48 CST 2019
pZxid = 0xa7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560014
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4}
cZxid = 0xbd
ctime = Mon Sep 02 10:19:09 CST 2019
mZxid = 0xbd
mtime = Mon Sep 02 10:19:09 CST 2019
pZxid = 0xbd
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560018
dataLength = 188
numChildren = 0           

複制

5. 測試Kafka

5.1 建立Topic

讓我們建立一個名為

test

的 Topic,它有一個分區和一個副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test           

複制

現在我們可以運作

list

指令來檢視這個 Topic:

smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0
test           

複制

或者,你也可将代理配置為:在釋出的topic不存在時,自動建立topic,而不是手動建立。

5.2 啟動生産者

Kafka 自帶一個指令行用戶端,它從檔案或标準輸入中擷取輸入,并将其作為消息發送到 Kafka 叢集。預設情況下,每行将作為單獨的消息發送。

運作 Producer (生産者),然後在控制台輸入一些消息以發送到伺服器:

smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is my first message
>this is my second message           

複制

5.3 啟動消費者

Kafka 還有一個指令行 Consumer(消費者),将消息轉儲到标準輸出:

smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is my first message
this is my second message           

複制

如果你将上述指令在不同的終端中運作,那麼現在就可以将消息輸入到生産者終端中,并将它們在消費終端中顯示出來。

原文:Quickstart