天天看點

「kafka原理」kafka Log存儲解析以及索引機制

作者:石臻臻的雜貨鋪

作者:石臻臻, CSDN部落格之星Top5、Kafka Contributor 、nacos Contributor、華為雲 MVP ,騰訊雲TVP, 滴滴Kafka技術專家 、 KnowStreaming。

KnowStreaming 是滴滴開源的Kafka運維管控平台, 有興趣一起參與參與開發的同學,但是怕自己能力不夠的同學,可以聯系我,當你導師帶你參與開源! 。

本文設定到的配置項有

名稱 描述 類型 預設
num.partitions topic的預設分區數 int 1
log.dirs 儲存日志資料的目錄。如果未設定,則使用log.dir中的值 string /tmp/kafka-logs
offsets.topic.replication.factor offset topic複制因子(ps:就是備份數,設定的越高來確定可用性)。為了確定offset topic有效的複制因子,第一次請求offset topic時,活的broker的數量必須最少最少是配置的複制因子數。 如果不是,offset topic将建立失敗或擷取最小的複制因子(活着的broker,複制因子的配置) short 3
log.index.interval.bytes 添加一個條目到offset的間隔 int 4096

首先啟動kafka叢集,叢集中有三台Broker; 設定3個分區,3個副本;

1發送topic消息

啟動之後kafka-client發送一個topic為消息szz-test-topic的消息

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++){
            producer.send(new ProducerRecord<String, String>("szz-test-topic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
           

發送了之後可以去log.dirs路徑下看看

「kafka原理」kafka Log存儲解析以及索引機制

這裡的3個檔案夾分别代表的是3個分區; 那是因為我們配置了這個topic的分區數num.partitions=3; 和備份數offsets.topic.replication.factor=3; 這3個檔案夾中的3個分區有Leader有Fllower; 那麼我們怎麼知道誰是誰的Leader呢?

2檢視topic的分區和副本 bin/kafka-topics.sh --describe --topic szz-test-topic --zookeeper localhost:2181

「kafka原理」kafka Log存儲解析以及索引機制

可以看到查詢出來顯示 分區Partition-0在broker.id=0中,其餘的是副本Replicas 2,1 分區Partition-1在broker.id=1中,其餘的是副本Replicas 0,2 ...

或者也可以通過zk來 檢視leader在哪個broker上

get /brokers/topics/src-test-topic/partitions/0/state
           
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/szz-test-topic/partitions/0/state
{"controller_epoch":5,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
cZxid = 0x1001995bf
           

3分區檔案都有啥

進入檔案夾看到如下檔案:

「kafka原理」kafka Log存儲解析以及索引機制
「kafka原理」kafka Log存儲解析以及索引機制

在這裡插入圖檔描述

名稱 描述 類型 預設 log.segment.bytes 單個日志檔案的最大大小 int 1073741824

我們試試多發送一些消息,看它會不會生成新的 segment

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 163840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 1200; i++){
            //将一個消息設定大一點
            byte[] log = new byte[904800];
            String slog = new String(log);
            producer.send(new ProducerRecord<String, String>("szz-test-topic",0, Integer.toString(i),  slog));
        }
        producer.close();
    }
           
「kafka原理」kafka Log存儲解析以及索引機制

在這裡插入圖檔描述

從圖中可以看到第一個segment檔案00000000000000000000.log快要滿log.segment.bytes 的時候就開始建立了00000000000000005084.log了; 并且.log和.index、.timeindex檔案是一起出現的; 并且名稱是以檔案第一個offset命名的

  • .log存儲消息檔案
  • .index存儲消息的索引
  • .timeIndex,時間索引檔案,通過時間戳做索引

消息檔案

上面的幾個檔案我們來使用kafka自帶工具bin/kafka-run-class.sh 來讀取一下都是些啥 bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log

「kafka原理」kafka Log存儲解析以及索引機制

最後一行:

baseoffset:5083 position: 1072592768 CreateTime: 1603703296169

.index 消息索引

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index

「kafka原理」kafka Log存儲解析以及索引機制

最後一行:

offset:5083  position:1072592768
           

.timeindex 時間索引檔案

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex

「kafka原理」kafka Log存儲解析以及索引機制

最後一行:

timestamp: 1603703296169 offset: 5083

Kafka如何查找指定offset的Message的

找了個部落客的圖 @lizhitao

「kafka原理」kafka Log存儲解析以及索引機制

比如:要查找絕對offset為7的Message:

  1. 首先是用二分查找确定它是在哪個LogSegment中,自然是在第一個Segment中。
  2. 打開這個Segment的index檔案,也是用二分查找找到offset小于或者等于指定offset的索引條目中最大的那個offset。自然offset為6的那個索引是我們要找的,通過索引檔案我們知道offset為6的Message在資料檔案中的位置為9807。
  3. 打開資料檔案,從位置為9807的那個地方開始順序掃描直到找到offset為7的那條Message。

Kafka 中的索引檔案,以稀疏索引(sparse index)的方式構造消息的索引,它并不保證每個消息在索引檔案中都有對應的索引項。每當寫入一定量(由 broker 端參數 log.index.interval.bytes 指定,預設值為 4096,即 4KB)的消息時,偏移量索引檔案 和 時間戳索引檔案 分别增加一個偏移量索引項和時間戳索引項,增大或減小 log.index.interval.bytes 的值,對應地可以縮小或增加索引項的密度。

稀疏索引通過 MappedByteBuffer 将索引檔案映射到記憶體中,以加快索引的查詢速度。

leader-epoch-checkpoint

leader-epoch-checkpoint 中儲存了每一任leader開始寫入消息時的offset; 會定時更新 follower被選為leader時會根據這個确定哪些消息可用

4參考文檔

kafka官方文檔

Kafka的Log存儲解析

Kafka-工作流程,檔案存儲機制,索引機制,如何通過offset找到對應的消息

Broker配置檔案詳解

日常運維、問題排查=> 滴滴開源LogiKM一站式Kafka監控與管控平台

在這裡插入圖檔描述