天天看點

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

  • Consumer API,允許應用程式訂閱一個或多個 topics 并處理為其生成的記錄流
  • Streams API,它允許應用程式作為流處理器,從一個或多個主題中消費輸入流并為其生成輸出流,有效的将輸入流轉換為輸出流。
  • Connector API,它允許建構和運作将 Kafka 主題連接配接到現有應用程式或資料系統的可用生産者和消費者。例如,關系資料庫的連接配接器可能會捕獲對表的所有更改
位元組Java面試必問:真的,搞定kafka看這一篇就夠了

Kafka 基本概念

Kafka 作為一個高度可擴充可容錯的消息系統,它有很多基本概念,下面就來認識一下這些 Kafka 專屬的概念

topic

Topic 被稱為主題,在 kafka 中,使用一個類别屬性來劃分消息的所屬類,劃分消息的這個類稱為 topic。topic 相當于消息的配置設定标簽,是一個邏輯概念。主題好比是資料庫的表,或者檔案系統中的檔案夾。

partition

partition 譯為分區,topic 中的消息被分割為一個或多個的 partition,它是一個實體概念,對應到系統上的就是一個或若幹個目錄,一個分區就是一個

送出日志

。消息以追加的形式寫入分區,先後以順序的方式讀取。

位元組Java面試必問:真的,搞定kafka看這一篇就夠了
注意:由于一個主題包含無數個分區,是以無法保證在整個 topic 中有序,但是單個 Partition 分區可以保證有序。消息被迫加寫入每個分區的尾部。Kafka 通過分區來實作資料備援和伸縮性

分區可以分布在不同的伺服器上,也就是說,一個主題可以跨越多個伺服器,以此來提供比單個伺服器更強大的性能。

segment

Segment 被譯為段,将 Partition 進一步細分為若幹個 segment,每個 segment 檔案的大小相等。

broker

Kafka 叢集包含一個或多個伺服器,每個 Kafka 中伺服器被稱為 broker。broker 接收來自生産者的消息,為消息設定偏移量,并送出消息到磁盤儲存。broker 為消費者提供服務,對讀取分區的請求作出響應,傳回已經送出到磁盤上的消息。

broker 是叢集的組成部分,每個叢集中都會有一個 broker 同時充當了

叢集控制器(Leader)

的角色,它是由叢集中的活躍成員選舉出來的。每個叢集中的成員都有可能充當 Leader,Leader 負責管理工作,包括将分區配置設定給 broker 和監控 broker。叢集中,一個分區從屬于一個 Leader,但是一個分區可以配置設定給多個 broker(非Leader),這時候會發生分區複制。這種複制的機制為分區提供了消息備援,如果一個 broker 失效,那麼其他活躍使用者會重新選舉一個 Leader 接管。

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

producer

生産者,即消息的釋出者,其會将某 topic 的消息釋出到相應的 partition 中。生産者在預設情況下把消息均衡地分布到主題的所有分區上,而并不關心特定消息會被寫到哪個分區。不過,在某些情況下,生産者會把消息直接寫到指定的分區。

consumer

消費者,即消息的使用者,一個消費者可以消費多個 topic 的消息,對于某一個 topic 的消息,其隻會消費同一個 partition 中的消息

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

在了解完 Kafka 的基本概念之後,我們通過搭建 Kafka 叢集來進一步深刻認識一下 Kafka。

確定安裝環境

安裝 Java 環境

在安裝 Kafka 之前,先確定Linux 環境上是否有 Java 環境,使用

java -version

指令檢視 Java 版本,推薦使用Jdk 1.8 ,如果沒有安裝 Java 環境的話,可以按照這篇文章進行安裝(https://www.cnblogs.com/zs-notes/p/8535275.html)

安裝 Zookeeper 環境

Kafka 的底層使用 Zookeeper 儲存中繼資料,確定一緻性,是以安裝 Kafka 前需要先安裝 Zookeeper,Kafka 的發行版自帶了 Zookeeper ,可以直接使用腳本來啟動,不過安裝一個 Zookeeper 也不費勁

Zookeeper 單機搭建

Zookeeper 單機搭建比較簡單,直接從 www.apache.org/dyn/closer.… 官網下載下傳一個穩定版本的 Zookeeper ,這裡我使用的是

3.4.10

,下載下傳完成後,在 Linux 系統中的

/usr/local

目錄下建立 zookeeper 檔案夾,使用

xftp

工具(xftp 和 xshell 工具都可以在官網 www.netsarang.com/zh/xshell/ 申請免費的家庭版)把下載下傳好的 zookeeper 壓縮包放到 /usr/local/zookeeper 目錄下。

如果下載下傳的是一個 tar.gz 包的話,直接使用

tar -zxvf zookeeper-3.4.10.tar.gz

解壓即可

如果下載下傳的是 zip 包的話,還要檢查一下 Linux 中是否有 unzip 工具,如果沒有的話,使用

yum install unzip

安裝 zip 解壓工具,完成後使用

unzip zookeeper-3.4.10.zip

解壓即可。

解壓完成後,cd 到

/usr/local/zookeeper/zookeeper-3.4.10

,建立一個 data 檔案夾,然後進入到 conf 檔案夾下,使用

mv zoo_sample.cfg zoo.cfg

進行重命名操作

然後使用 vi 打開 zoo.cfg ,更改一下

dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data

,儲存。

進入bin目錄,啟動服務輸入指令

./zkServer.sh start

輸出下面内容表示搭建成功

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

關閉服務輸入指令,

./zkServer.sh stop

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

使用

./zkServer.sh status

可以檢視狀态資訊。

Zookeeper 叢集搭建

準備條件

準備條件:需要三個伺服器,這裡我使用了CentOS7 并安裝了三個虛拟機,并為各自的虛拟機配置設定了

1GB

的記憶體,在每個

/usr/local/

下面建立 zookeeper 檔案夾,把 zookeeper 的壓縮包挪過來,解壓,完成後會有 zookeeper-3.4.10 檔案夾,進入到檔案夾,建立兩個檔案夾,分别是

data

log

檔案夾

注:上一節單機搭建中已經建立了一個data 檔案夾,就不需要重新建立了,直接建立一個 log 檔案夾,對另外兩個新增的服務需要建立這兩個檔案夾。

設定叢集

建立完成後,需要編輯 conf/zoo.cfg 檔案,三個檔案的内容如下

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888

           

server.1 中的這個 1 表示的是伺服器的辨別也可以是其他數字,表示這是第幾号伺服器,這個辨別要和下面我們配置的

myid

的辨別一緻可以。

192.168.1.7:12888:13888

為叢集中的 ip 位址,第一個端口表示的是 master 與 slave 之間的通信接口,預設是 2888,第二個端口是leader選舉的端口,叢集剛啟動的時候選舉或者leader挂掉之後進行新的選舉的端口,預設是 3888

現在對上面的配置檔案進行解釋

tickTime

: 這個時間是作為 Zookeeper 伺服器之間或用戶端與伺服器之間

維持心跳

的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

initLimit

:這個配置項是用來配置 Zookeeper 接受用戶端(這裡所說的用戶端不是使用者連接配接 Zookeeper 伺服器的用戶端,而是 Zookeeper 伺服器叢集中連接配接到 Leader 的 Follower 伺服器)初始化連接配接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 伺服器還沒有收到用戶端的傳回資訊,那麼表明這個用戶端連接配接失敗。總的時間長度就是 5*2000=10 秒

syncLimit

: 這個配置項辨別 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒

dataDir

: 快照日志的存儲路徑

dataLogDir

: 事務日志的存儲路徑,如果不配置這個那麼事務日志會預設存儲到dataDir指定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,産生的事務日志、快照日志太多

clientPort

: 這個端口就是用戶端連接配接 Zookeeper 伺服器的端口,Zookeeper 會監聽這個端口,接受用戶端的通路請求。

建立 myid 檔案

在了解完其配置檔案後,現在來建立每個叢集節點的 myid ,我們上面說過,這個 myid 就是

server.1

的這個 1 ,類似的,需要為叢集中的每個服務都指定辨別,使用

echo

指令進行建立

# server.1
echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.2
echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.3
echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid

           

啟動服務并測試

配置完成,為每個 zk 服務啟動并測試,我在 windows 電腦的測試結果如下

啟動服務(每台都需要執行)

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

           

檢查服務狀态

使用

./zkServer.sh status

指令檢查服務狀态

192.168.1.7 — follower

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

192.168.1.8 — leader

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

192.168.1.9 — follower

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

zk叢集一般隻有一個leader,多個follower,主一般是相應用戶端的讀寫請求,而從主同步資料,當主挂掉之後就會從follower裡投票選舉一個leader出來。

Kafka 叢集搭建

準備條件

  • 搭建好的 Zookeeper 叢集
  • Kafka 壓縮包

/usr/local

下建立

kafka

檔案夾,然後把下載下傳完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用

tar -zxvf 壓縮包

進行解壓,解壓完成後,進入到 kafka_2.12-2.3.0 目錄下,建立 log 檔案夾,進入到 config 目錄下

我們可以看到有很多 properties 配置檔案,這裡主要關注

server.properties

這個檔案即可。

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

kafka 啟動方式有兩種,一種是使用 kafka 自帶的 zookeeper 配置檔案來啟動(可以按照官網來進行啟動,并使用單個服務多個節點來模拟叢集http://kafka.apache.org/quickstart#quickstart_multibroker),一種是通過使用獨立的zk叢集來啟動,這裡推薦使用第二種方式,使用 zk 叢集來啟動

修改配置項

需要為

每個服務

都修改一下配置項,也就是

server.properties

, 需要更新和添加的内容有

broker.id=0 //初始是0,每個 server 的broker.id 都應該設定為不一樣的,就和 myid 一樣 我的三個服務分别設定的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設定zookeeper的連接配接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

           

配置項的含義

broker.id=0  #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
port=9092 #目前kafka對外提供服務的端口預設是9092
host.name=192.168.1.7 #這個參數預設是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個目錄可以配置為“,”逗号分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新建立的topic他把消息持久化的地方是,目前以逗号分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,資料不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當資料到達一定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #預設的分區數,一個topic預設1個分區數
log.retention.hours=168 #預設消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息儲存的最大值5M
default.replication.factor=2  #kafka儲存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到檔案,當超過這個值的時候,kafka會新起一個檔案
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄檢視是否有過期的消息如果有,删除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設定zookeeper的連接配接端口

           

啟動 Kafka 叢集并測試

  • 啟動服務,進入到

    /usr/local/kafka/kafka_2.12-2.3.0/bin

    目錄下
# 啟動背景程序
./kafka-server-start.sh -daemon ../config/server.properties

           
  • 檢查服務是否啟動
# 執行指令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka

           
  • kafka 已經啟動
  • 建立 Topic 來驗證是否建立成功
# cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下
bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan

           

對上面的解釋

–replication-factor 2 複制兩份

–partitions 1 建立1個分區

–topic 建立主題

檢視我們的主題是否出建立成功

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

           
位元組Java面試必問:真的,搞定kafka看這一篇就夠了
啟動一個服務就能把叢集啟動起來

在一台機器上建立一個釋出者

# 建立一個broker,釋出者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

           

在一台伺服器上建立一個訂閱者

# 建立一個consumer, 消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

           
注意:這裡使用 --zookeeper 的話可能出現

zookeeper is not a recognized option

的錯誤,這是因為 kafka 版本太高,需要使用

--bootstrap-server

指令

測試結果

釋出

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

消費

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

其他指令

顯示 topic

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

# 顯示
cxuantopic

           

檢視 topic 狀态

bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic

# 下面是顯示的詳細資訊
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

# 分區為為1  複制因子為2   主題 cxuantopic 的分區為0 
# Replicas: 0,1   複制的為1,2

           

Leader

負責給定分區的所有讀取和寫入的節點,每個節點都會通過随機選擇成為 leader。

Replicas

是為該分區複制日志的節點清單,無論它們是 Leader 還是目前處于活動狀态。

Isr

是同步副本的集合。它是副本清單的子集,目前仍處于活動狀态并追随Leader。

至此,kafka 叢集搭建完畢。

驗證多節點接收資料

剛剛我們都使用的是 相同的ip 服務,下面使用其他叢集中的節點,驗證是否能夠接受到服務

在另外兩個節點上使用

最後

這份文檔從建構一個鍵值資料庫的關鍵架構入手,不僅帶你建立起全局觀,還幫你迅速抓住核心主線。除此之外,還會具體講解資料結構、線程模型、網絡架構、持久化、主從同步和切片叢集等,幫你搞懂底層原理。相信這對于所有層次的Redis使用者都是一份非常完美的教程了。

位元組Java面試必問:真的,搞定kafka看這一篇就夠了

快速入手通道:(戳這裡,免費下載下傳)誠意滿滿!!!

整理不易,覺得有幫助的朋友可以幫忙點贊分享支援一下小編~

你的支援,我的動力;祝各位前程似錦,offer不斷!!!

驗證是否能夠接受到服務

在另外兩個節點上使用

最後

這份文檔從建構一個鍵值資料庫的關鍵架構入手,不僅帶你建立起全局觀,還幫你迅速抓住核心主線。除此之外,還會具體講解資料結構、線程模型、網絡架構、持久化、主從同步和切片叢集等,幫你搞懂底層原理。相信這對于所有層次的Redis使用者都是一份非常完美的教程了。

[外鍊圖檔轉存中…(img-UPyHExPu-1628148746992)]

快速入手通道:(戳這裡,免費下載下傳)誠意滿滿!!!

整理不易,覺得有幫助的朋友可以幫忙點贊分享支援一下小編~

你的支援,我的動力;祝各位前程似錦,offer不斷!!!