Kafka的實作細節
一、Topic和Partition
在Kafka中的每一條消息都有一個topic。一般來說在我們應用中産生不同類型的資料,都可以設定不同的主題。一個主題一般會有多個消息的訂閱者,當生産者釋出消息到某個主題時,訂閱了這個主題的消費者都可以接收到生産者寫入的新消息。
kafka為每個主題維護了分布式的分區(partition)日志檔案,每個partition在kafka存儲層面是append log。任何釋出到此partition的消息都會被追加到log檔案的尾部,在分區中的每條消息都會按照時間順序配置設定到一個單調遞增的順序編号,也就是我們的offset,offset是一個long型的數字,我們通過這個offset可以确定一條在該partition下的唯一消息。在partition下面是保證了有序性,但是在topic下面沒有保證有序性。
在上圖中在我們的生産者會決定發送到哪個Partition。
如果沒有Key值則進行輪詢發送。
如果有Key值,對Key值進行Hash,然後對分區數量取餘,保證了同一個Key值的會被路由到同一個分區,如果想隊列的強順序一緻性,可以讓所有的消息都設定為同一個Key。
二、消費模型
消息由生産者發送到kafka叢集後,會被消費者消費。一般來說我們的消費模型有兩種:推送模型(psuh)和拉取模型(pull)
基于推送模型的消息系統,由消息代理記錄消費狀态。消息代理将消息推送到消費者後,标記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。比如當我們把已經把消息發送給消費者之後,由于消費程序挂掉或者由于網絡原因沒有收到這條消息,如果我們在消費代理将其标記為已消費,這個消息就永久丢失了。如果我們利用生産者收到消息後回複這種方法,消息代理需要記錄消費狀态,這種不可取。如果采用push,消息消費的速率就完全由消費代理控制,一旦消費者發生阻塞,就會出現問題。
Kafka采取拉取模型(poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量進行消費。比如消費者可以消費已經消費過的消息進行重新處理,或者消費最近的消息等等。
三、網絡模型
3.1 KafkaClient --單線程Selector
單線程模式适用于并發連結數小,邏輯簡單,資料量小。
在kafka中,consumer和producer都是使用的上面的單線程模式。這種模式不适合kafka的服務端,在服務端中請求處理過程比較複雜,會造成線程阻塞,一旦出現後續請求就會無法處理,會造成大量請求逾時,引起雪崩。而在伺服器中應該充分利用多線程來處理執行邏輯。
3.2 Kafka--server -- 多線程Selector
在kafka服務端采用的是多線程的Selector模型,Acceptor運作在一個單獨的線程中,對于讀取操作的線程池中的線程都會在selector注冊read事件,負責服務端讀取請求的邏輯。成功讀取後,将請求放入message queue共享隊列中。然後在寫線程池中,取出這個請求,對其進行邏輯處理,即使某個請求線程阻塞了,還有後續的縣城從消息隊列中擷取請求并進行處理,在寫線程中處理完邏輯處理,由于注冊了OP_WIRTE事件,是以還需要對其發送響應。
四、高可靠分布式存儲模型
在Kafka中保證高可靠模型的依靠的是副本機制,有了副本機制之後,就算機器當機也不會發生資料丢失。
4.1高性能的日志存儲
kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節點上。同時在kafka的機器上,每個Partition其實都會對應一個日志目錄,在目錄下面會對應多個日志分段(LogSegment)。LogSegment檔案由兩部分組成,分别為“.index”檔案和“.log”檔案,分别表示為segment索引檔案和資料檔案。這兩個檔案的指令規則為:partition全局的第一個segment從0開始,後續每個segment檔案名為上一個segment檔案最後一條消息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下,假設有1000條消息,每個LogSegment大小為100,下面展現了900-1000的索引和Log:
由于kafka消息資料太大,如果全部建立索引,即占了空間又增加了耗時,是以kafka選擇了稀疏索引的方式,這樣的話索引可以直接進入記憶體,加快偏查詢速度。
簡單介紹一下如何讀取資料,如果我們要讀取第911條資料首先第一步,找到他是屬于哪一段的,根據二分法查找到他屬于的檔案,找到0000900.index和00000900.log之後,然後去index中去查找 (911-900) =11這個索引或者小于11最近的索引,在這裡通過二分法我們找到了索引是[10,1367]然後我們通過這條索引的實體位置1367,開始往後找,直到找到911條資料。
上面講的是如果要找某個offset的流程,但是我們大多數時候并不需要查找某個offset,隻需要按照順序讀即可,而在順序讀中,作業系統會對記憶體和磁盤之間添加page cahe,也就是我們平常見到的預讀操作,是以我們的順序讀操作時速度很快。但是kafka有個問題,如果分區過多,那麼日志分段也會很多,寫的時候由于是批量寫,其實就會變成随機寫了,随機I/O這個時候對性能影響很大。是以一般來說Kafka不能有太多的partition。針對這一點,RocketMQ把所有的日志都寫在一個檔案裡面,就能變成順序寫,通過一定優化,讀也能接近于順序讀。
★★★可以思考一下:1.為什麼需要分區,也就是說主題隻有一個分區,難道不行嗎?2.日志為什麼需要分段
日志政策
日志保留政策
無論消費者是否已經消費了消息,kafka都會一直儲存這些消息,但并不會像資料庫那樣長期儲存。為了避免磁盤被占滿,kafka會配置響應的保留政策(retention policy),以實作周期性地删除陳舊的消息 kafka有兩種“保留政策”:
- 根據消息保留的時間,當消息在kafka中儲存的時間超過了指定時間,就可以被删除;
- 根據topic存儲的資料大小,當topic所占的日志檔案大小大于一個閥值,則可以開始删除最舊的消息
日志壓縮政策
在很多場景中,消息的key與value的值之間的對應關系是不斷變化的,就像資料庫中的資料會不斷被修改一樣,消費者隻關心key對應的最新的value。我們可以開啟日志壓縮功能,kafka定期将相同key的消息進行合并,隻保留最新的value值
4.2 副本機制
Kafka的副本機制是多個服務端節點對其他節點的主題分區的日志進行複制。當叢集中的某個節點出現故障,通路故障節點的請求會被轉移到其他正常節點(這一過程通常叫Reblance),kafka每個主題的每個分區都有一個主副本以及0個或者多個副本,副本保持和主副本的資料同步,當主副本出故障時就會被替代。
在Kafka中并不是所有的副本都能被拿來替代主副本,是以在kafka的leader節點中維護着一個ISR(In sync Replicas)集合,翻譯過來也叫正在同步中集合,在這個集合中的需要滿足兩個條件:
節點必須和ZK保持連接配接
在同步的過程中這個副本不能落後主副本太多
另外還有個AR(Assigned Replicas)用來辨別副本的全集,OSR用來表示由于落後被剔除的副本集合,是以公式如下:ISR = leader + 沒有落後太多的副本; AR = OSR+ ISR;
這裡先要說下兩個名詞:HW(high watermark)是consumer能夠看到的此partition的位置,LEO( log end offset)是每個partition的log最後一條Message的位置。HW能保證leader所在的broker失效,該消息仍然可以從新選舉的leader中擷取,不會造成消息丢失。
當producer向leader發送資料時,可以通過request.required.acks參數來設定資料可靠性的級别:
1(預設):這意味着producer在ISR中的leader已成功收到的資料并得到确認後發送下一條message。如果leader當機了,則會丢失資料。
0:這意味着producer無需等待來自broker的确認而繼續發送下一批消息。這種情況下資料傳輸效率最高,但是資料可靠性确是最低的。
-1:producer需要等待ISR中的所有follower都确認接收到資料後才算一次發送完成,可靠性最高。但是這樣也不能保證資料不丢失,比如當ISR中隻有leader時(其他節點都和zk斷開連接配接,或者都沒追上),這樣就變成了acks=1的情況。
副本資料同步細節(HW和LEO)
4.3 資料操作
為避免broker挂後造成資料丢失,kafka實作了高可用方式。
- 基于partition實作Replica。并與zookeeper配合實作Leader的選舉。
- 通過算法,将partition的Leader與Fellowers分散于不同的broker。
replica實作
在“brokers的實體結構”中,replication有多個follewers,分散于不同的brokers。通過增量日志實作。
partition的log記錄是順序的,通過server.properties中log.retention.hours參數定義日志保留時長,過期則删除。新寫入的message append記錄在partition中。
為提升效率,
- follewers會在message未寫入log時,讀到message則将ACK發送給Leader,是以隻能保證存在Replica,不能保證資料一定持久化了。
- 批量複制
ISR(副本同步隊列)
ISR是In-Sync Replicate 記錄與Leader保持同步的清單。
維護的是有資格的follower節點
- 副本的所有節點都必須要和zookeeper保持連接配接狀态
- 副本的最後一條消息的offset和leader副本的最後一條消息的offset之間的內插補點不能超過指定的閥值,這個閥值是可以設定的(replica.lag.max.messages)
4.4 leader 選舉(Leader Election )
判斷Replica活着,(1)與zk有心跳通訊;(2)與Leader通訊及時。兩者有一不滿足,fellower都會從ISR中移除。
選舉算法
一般的leader選舉算法,有Majority Vote/Zab/Raft/PacificA。kafka采用的即PacificA,kafka維護多個ISR,但不不像Majorty Vote算法,限制最少的2N+1節點和N+1以上投票。
即使隻有1個follewer,也可完成Leader選舉。
選舉過程(詳解)
五、Kafka的高吐量的因素
- 順序寫的方式存儲資料 ;
- 批量發送: 在異步發送模式中。kafka允許進行批量發送,也就是先講消息緩存到記憶體中,然後一次請求批量發送出去。這樣減少了磁盤頻繁io以及網絡IO造成的性能瓶頸 batch.size 每批次發送的資料大小 linger.ms 間隔時間
- 零拷貝:消息從發送到落地儲存,broker維護的消息日志本身就是檔案目錄,每個檔案都是二進制儲存,生産者和消費者使用相同的格式來處理。在消費者擷取消息時,伺服器先從硬碟讀取資料到記憶體,然後把記憶體中的資料原封不懂的通過socket發送給消費者。雖然這個操作描述起來很簡單,但實際上經曆了很多步驟
1、作業系統将資料從磁盤讀入到核心空間的頁緩存 2、應用程式将資料從核心空間讀入到使用者空間緩存中 3、應用程式将資料寫回到核心空間到socket緩存中 4、作業系統将資料從socket緩沖區複制到網卡緩沖區,以便将資料經網絡發出
通過“零拷貝”技術可以去掉這些沒必要的資料複制操作,同時也會減少上下文切換次數
// 通過多種方式操作Kafka的消息讀取
https://blog.csdn.net/u011784767/article/details/78663168
六、檔案存儲機制
七、消息确認(确認offset)
自動送出
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
複制
手動送出
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@Override
public void doWork() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition=" + record.partition() + ",offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
this.msgList.add(record);
}
if (msgList.size() >= 5) {
System.out.println("Execute commit Message....");
// 手動送出offset
consumer.commitAsync(); // 異步送出
// consumer.commitSync(); // 同步送出
// 消費完成,送出offset (原子)
this.msgList.clear();
}
}
}
複制
八、 Kafka 消息可靠性(offset)
1、Kafka 消息的問題
Kafka就比較适合高吞吐量并且允許少量資料丢失的場景,如果非要保證“消息隻讀取一次”,可以使用JMS。
Kafka Producer 消息發送有兩種方式(配置參數 producer.type):
producer.type=sync(預設值): 背景線程中消息發送是同步方式,對應的類為 kafka.producer.SyncProducer; producer.type=async: 背景線程中消息發送是異步方式,對應的類為 kafka.producer.AyncProducer;優點是可批量發送消息(消息個數達到 batch.num.messages=200 或時間達到 “ 時發送)、吞吐量佳,缺點是發送不及時可能導緻丢失; 對于同步方式(producer.type=sync)?Kafka Producer 消息發送有三種确認方式(配置參數 acks):
acks=0: producer 不等待 Leader 确認,隻管發出即可;最可能丢失消息,适用于高吞吐可丢失的業務; acks=1(預設值): producer 等待 Leader 寫入本地日志後就确認;之後 Leader 向 Followers 同步時,如果 Leader 當機會導緻消息沒同步而丢失,producer 卻依舊認為成功; acks=all/-1: producer 等待 Leader 寫入本地日志、而且 Leader 向 Followers 同步完成後才會确認;最可靠。 Kafka Consumer 有兩個接口:
Low-level API: 消費者自己維護 offset 等值,可以完全控制; High-level API: 封裝了對 parition 和 offset 的管理,使用簡單;可能遇到 Consumer 取出消息并更新了 offset,但未處理消息即當機,進而相當于消息丢失; Kafka 支援 3 種消息傳遞語義:
最多一次 -消息可能會丢失,但永遠不會重新發送。consumer.poll(); consumer.commitOffset(); processMsg(messages); 至少一次 -消息永遠不會丢失,但可能會重新傳遞。consumer.poll(); processMsg(messages); consumer.commitOffset(); 恰恰一次 - 這就是人們真正想要的,每條資訊隻傳遞一次。以事務來保證。
2 消息重複
根本原因:已經消費了資料,但是 offset 沒送出。 外在原因:(1)消費資料後、送出 offset 前,線程被殺; (2)設定 offset 為自動送出,consumer.close() 之前 consumer.unsubscribe(); (3)consumer 取了一批資料,尚未處理完畢時,達到了 session.timeout.ms,導緻沒有接收心跳而挂掉,自動送出offset失敗,下次會重複消費本批消息; 解決辦法:(1)唯一 ID 儲存在外部媒體中,每次消費時根據它判斷是否已處理; (2)如果在統計用,丢失幾條關系不大,則無需理會; (3)如果消費者來不及處理,可以這樣優化:增加分區以提高并行能力;增加消費者線程;關閉自動送出 enable.auto.commit=false
3 消息丢失
根本原因:已經送出了 offset,但資料在記憶體中尚未處理,線程就被殺掉。
消息丢失解決方案:
同步模式下,确認機制設定為-1(不可為1),即讓消息寫入Leader和Follower之後再确認消息發送成功; 異步模式下,設定為不限制阻塞逾時時間(不可為acks=0),當緩沖區滿時不清空緩沖池,而是讓生産者一直處于阻塞狀态;
4 消息亂序 (如何保證kafka中消息按照順序消費)
傳統的隊列,在并行處理時,由于網絡故障或速度差異,盡管伺服器傳遞是有序的,但消費者接收的順序可能不一緻; Kafka 在主題内部有分區,并行處理時,每個分區僅由消費者組中的一個消費者使用,確定了消費者是該分區的唯一讀者,并按順序使用這些資料。
但是它也僅僅是保證Topic的一個分區順序處理,不能保證跨分區的消息先後處理順序,除非隻提供一個分區。
九、Kafka的分區配置設定政策
partition.assignmentStrategy 指定分區政策
Range 範圍分區(預設的)
假如有10個分區,3個消費者,把分區按照序号排列0,1,2,3,4,5,6,7,8,9;消費者為C1,C2,C3,那麼用分區數除以消費者數來決定每個Consumer消費幾個Partition,除不盡的前面幾個消費者将會多消費一個 最後配置設定結果如下
C1:0,1,2,3 C2:4,5,6 C3:7,8,9
如果有11個分區将會是:
C1:0,1,2,3 C2:4,5,6,7 C3:8,9,10
假如我們有兩個主題T1,T2,分别有10個分區,最後的配置設定結果将會是這樣:
C1:T1(0,1,2,3) T2(0,1,2,3) C2:T1(4,5,6) T2(4,5,6) C3:T1(7,8,9) T2(7,8,9)
在這種情況下,C1多消費了兩個分區
RoundRobin 輪詢分區
把所有的partition和consumer列出來,然後輪詢consumer和partition,盡可能的讓把partition均勻的配置設定給consumer
假如有3個Topic T0(三個分區P0-0,P0-1,P0-2),T1(兩個分區P1-0,P1-1),T2(四個分區P2-0,P2-1,P2-2,P2-3)
有三個消費者:C0(訂閱了T0,T1),C1(訂閱了T1,T2),C2(訂閱了T0,T2)!
那麼分區過程如下圖所示
分區将會按照一定的順序排列起來,消費者将會組成一個環狀的結構,然後開始輪詢。 P0-0配置設定給C0 P0-1配置設定給C1但是C1并沒訂閱T0,于是跳過C1把P0-1配置設定給C2, P0-2配置設定給C0 P1-0配置設定給C1, P1-1配置設定給C0, P2-0配置設定給C1, P2-1配置設定給C2, P2-2配置設定給C1, p2-3配置設定給C2
C0: P0-0,P0-2,P1-1 C1:P1-0,P2-0,P2-2 C2:P0-1,P2-1,P2-3
什麼時候觸發分區配置設定政策: 1.同一個Consumer Group内新增或減少Consumer
2.Topic分區發生變化
Rebalance的執行
kafka提供了一個角色Coordinator來執行。當Consumer Group的第一個Consumer啟動的時候,他會向kafka叢集中的任意一台broker發送GroupCoordinatorRequest請求,broker會傳回一個負載最小的broker設定為coordinator,之後該group的所有成員都會和coordinator進行協調通信
整個Rebalance分為兩個過程 jionGroup和sysncJion
joinGroup過程
在這一步中,所有的成員都會向coordinator發送JionGroup請求,請求内容包括group_id,member_id.protocol_metadata等,coordinator會從中選出一個consumer作為leader,并且把組成員資訊和訂閱消息,leader資訊,rebanlance的版本資訊發送給consumer
Synchronizing Group State階段
組成員向coordinator發送SysnGroupRequet請求,但是隻有leader會發送分區配置設定的方案(分區配置設定的方案其實是在消費者确定的),當coordinator收到leader發送的分區配置設定方案後,會通過SysnGroupResponse把方案同步到各個consumer中