天天看點

kafka的認識、安裝與配置

認識Kafka

花費越少的精力在資料移動上,就能越專注于核心業務 --- 《Kafka:The Definitive Guide》

認識 Kafka 之前,先了解一下釋出與訂閱消息系統:消息的發送者不會直接把消息發送給接收者、發送者以某種方式對消息進行分類,接收者訂閱它們,以便能接受特定類型的消息。釋出與訂閱系統一般會有一個 broker(n. 經紀人、中間商) 也就是釋出消息的中心點。

Kafka 是一款基于釋出與訂閱的消息系統,一般被稱為“分布式送出日志”或者“分布式流平台”。 Kafka 的資料單元被稱作消息,可以看作是資料庫中的一行資料,消息是由位元組數組組成,故對 kafka 來說消息沒有特别的意義,消息可以有一個可選的中繼資料,也就是鍵。鍵也是一個位元組數組,同樣對于 kafka 沒有什麼特殊意義。鍵可以用來将消息以一種可控的方式寫入分區。最簡單的例子就是為鍵生成一個一緻性散列值,然後使用散列值對主題分區數進行取模,為消息選擇分區。這樣可以保證具有相同鍵的消息總是被寫在相同的分區上。保證消息在一個主題中順序讀取。

為了提高效率,消息将被分批次寫入 Kafka 。批次就是一組消息,類似于 redis 中的流水線(Pipelined)操作。

主題和分區

kafka 的消息通過主題進行分類,主題就相當于資料庫中的表,主題可以被分成若幹個分區,一個分區就是一個送出日志,消息以追加的形式被寫入分區。然後按照先入先出的順序讀取。一個主題下的分區也可以在不同的伺服器上,以此提供比單個伺服器更加強大的性能

kafka的認識、安裝與配置

生産者和消費者

Kafka 的用戶端就是 Kafka 系統的使用者,一般情況下有兩種基本類型:生産者和消費者

Producer 生産者建立消息,一般情況下,一個消息會被釋出到一個特定的主題上。生産者在預設情況下将消息均分在主題的每個分區上

Consumer 消費者讀取消息,消費者訂閱一個或多個主題,并按照消息的生成順序讀取他們,消費者通過檢查消息的偏移量來區分已經讀過的消息。這個偏移量會被消費者在 zk 或者 kafka 上儲存,如果消費者關閉或者重新開機,他的讀取狀态不會消失

消費者是消費者群組 Consumer group的一部分,群組可以保證每個分區被一個消費者消費(是以消費者數量不能大于分區數量,會造成消費者伺服器的浪費),如果一個消費者失效,群組裡的其他消費者可以接管失效消費者的工作。

kafka的認識、安裝與配置

Kafka的優點

  1. 無縫支援多個生産者
  2. 支援多個消費者從一條消息流讀取資料、且各個消費者之間的偏移量不影響。也支援多個消費者共享一個消息流,并保證整個消費者群組對每個消息隻消費一次
  3. 可以對每個主題設定保留規則,根據保留規則持久化資料到磁盤
  4. 高性能,高伸縮性

安裝

Kafka 使用 Zookeeper(後面簡稱zk) 儲存叢集的中繼資料資訊和消費者資訊, Kafka 發行版本自帶 zk,可以直接從腳本啟動,不過安裝一個完整版的 zk 也不難

kafka的認識、安裝與配置

安裝單節點 zk

官方下載下傳位址:http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/

如果下載下傳速度不如意,可以使用我的藍奏雲:https://keats.lanzous.com/iMWi8dpi04f 提取碼: keats

安裝目錄: /usr/local/zookeeper

資料目錄: /var/lib/zookeeper

# tar -zxf zookeeper-3.4.6.tar.gz
# mv zookeeper-3.4.6 /usr/local/zookeeper
# mkdir -p /var/lib/zookeeper
# cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
> tickTime=2000
> dataDir=/var/lib/zookeeper
> clientPort=2181
> EOF
# 接着設定一下環境變量中的 JAVA_HOME,可以先使用 export 指令檢視是否已經設定
# export JAVA_HOME=/xxx
# 最後切換到 zk 安裝目錄,啟動 zk
# /usr/local/zookeeper/bin/zkServer.sh start
           

接着通過四字指令 srvr 驗證 zk 是否安裝正确

# telnet localhost 2181
Trying ::1...
Connected to localhost.
Escape character is '^]'.
srvr
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.
[root@linux-keats bin]# pwd
/usr/local/zookeeper/bin
           

安裝單節點 Kafka

下載下傳: https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

藍奏雲:下載下傳後将字尾名 zip 改為 tgz:https://keats.lanzous.com/iaZ9hdpj5bi

# tar -zxf kafka_2.11-0.9.0.1.tgz
# mv kafka_2.11-0.9.0.1 /usr/local/kafka
# mkdir /tmp/kafka-logs
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
           

其中 -dadmon 表示 kafka 以守護線程的形式啟動

配置 kafka

#broker 的全局唯一編号,叢集中不能重複。int類型
broker.id=0
#是否允許删除 topic 
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#處理磁盤 IO 的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka 運作日志(此日志非正常意義的日志)存放的路徑。用上一步建立的目錄。
log.dirs=/tmp/kafka-logs
#topic 建立時預設的分區數
num.partitions=1
#用來恢複和清理 data 下資料的線程數量
num.recovery.threads.per.data.dir=1
#segment 檔案保留的最長時間,逾時将被删除
log.retention.hours=168
#配置連接配接 Zookeeper 位址。如果是 zk 叢集,使用 , 隔開
zookeeper.connect=localhost:2181
           

叢集

zk 叢集的安裝請度娘 zk 叢集,kafka 可以按照末尾參考文獻安裝叢集。我這裡測試伺服器性能不行還跑了幾個 java 程式,就不裝叢集了

測試

主題相關操作

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
           
  • --create 建立操作 還有 --list 查詢,--describe 詳情
  • --zookeeper localhost:2181 配置 zk 的資訊
  • --partitions 1 分區數目 1
  • --replication-factor 1 副本數 1。副本數不能大于 kafka broker 節點的數目
  • --topic test 指定主題名稱

建立好主題後,logs 檔案夾内就會出現 主題名-分區名 的送出日志

kafka的認識、安裝與配置

往主題發送消息

# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test Message 1
Test Message 2
^D
           

從測試主題讀取消息

# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test Message 1
Test Message 2
^C
Processed a total of 2 messages
           

内外網通路 kafka

在 kafka 的 server.properties 中,下面的配置預設是 PLAINTEXT://:9092,表示預設情況下 kafka 隻監聽内網9092端口的請求。如果伺服器需要外網通路,需要更改配置:

# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# if you don't understand there see: https://blog.csdn.net/gk91620/article/details/103985844
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://内網IP:内網端口1,EXTERNAL://内網IP:内網端口2
advertised.listeners=INTERNAL://内網IP:内網端口1,EXTERNAL://外網IP:外網通路的端口
inter.broker.listener.name=INTERNAL
           

這麼配置下來,對于外網的生産者和消費者,通過 外網IP:外網通路的端口 來與 kafka 互動,而 内網IP:内網端口1 則被用于叢集中 kafka 之間的互動

參考

尚矽谷kafka教程

《kafka權威指南》 --- 美國人著