天天看點

kafka重要概念與叢集重點配置詳解

文章目錄

    • 重要概念
      • broker
      • topic
      • partition
      • segment
      • offset
      • replica
      • producer
      • consumer
      • Consumer group
      • leader 與 follower
      • controller
      • coordinator
    • kafka的重要配置
      • boker相關
      • producer(生産者相關)
      • consumer(消費者相關)
      • replica(副本相關)
      • log(日志相關)
      • controller
    • 總結

重要概念

broker

一個broker就是一個kafka執行個體,負責接收、轉發、存儲消息,kafka叢集就是由多個broker組成。

topic

kafka的topic是一個邏輯概念,就是對消息分組、分類,便于區分處理不同業務邏輯的消息。topic和Elasticsearch中的索引概念比較像。

partition

kafka的partition是實體上的概念,對應的是檔案系統中的一個檔案夾,partition是針對topic的,主要是為了考慮到topic資料量很多的情況,通過對topic的資料拆分為partition可以并行處理,提高并發量。

kafka中的partition和Elasticsearch中的shard比較像,都是對統一邏輯分類的資料進行實體拆分。

segment

kafka的partition對應的是檔案夾,稍微思考就會發現,消息存儲應該是檔案,那麼kafka存儲消息的檔案叫什麼呢?

答案是segment,很多地方将其翻譯為段。

segment是對kafka的topic進一步實體拆分,通過根據實際的機器情況合理的配置segment大小,配合上kafka自己索引機制,可以更快的執行讀取和寫入操作。

offset

offset是消息偏移量,這個偏移量是消息條數,而不是位元組

replica

replica就是副本,基本所有分布式中間件都有副本這個概念

kafka的副本是針對partition的,而不是針對topic的

在kafka叢集中通過将partition的不同副本分布在不同的broker上來提高可用性,一個broker挂了,還有其他的副本可用。

副本都有兩個重要的屬性:LEO 和 HW

Log End Offset(LEO):log中的下一條消息的 offset

High Watermark(HW):所有副本中最小的LEO

為什麼所有副本中最小的LEO,反而叫高水位(HW)呢?

主要是因為Kafka不允許消費者消費大于所有副本中最小的LEO消息,是以就叫高水位(HW)

這樣做主要是為了資料出現不一緻的情況,例如Leader中的LEO比較大,然後挂了,其他副本成為了Leader

producer

消息生産者,釋出消息到kafka叢集的服務

consumer

消息消費者,從kafka叢集中消費消息的服務

Consumer group

Consumer group是high-level consumer API中的概念,每個consumer都屬于一個 consumer group,每條消息隻能被 consumer group 的一個 Consumer 消費,但可以被多個consumer group 消費。

通過設定Consumer group,可以實作一條消息被不同的組消費,非常實用,例如一個登入消息,可能資料統計業務和活動業務都需要,那麼隻需要設定不同的Consumer group,就都可以消費到同一個登入消息。

leader 與 follower

副本有2中角色,一種是leader,另一種是follower。

相同副本中隻有一個leader,其他副本為follower。

producer和consumer隻跟leader互動,然後由leader和follower互動。

例如producer發消息給leader,leader将消息轉發給follower,會根據producer的ack配置執行不同的響應,後面詳細介紹。

controller

controller是針對broker的,kafka叢集中的broker會選舉出一個leader用來控制partition的leader選舉、failover等操作,broker選舉出來的leader就是controller。

broker的選舉依賴于Zookeeper,Broker節點一起去Zookeeper上注冊一個臨時節點,因為隻有一個Broker會注冊成功,其他的都會失敗,成功在Zookeeper上注冊臨時節點的Broker會成為controller,其他的broker叫Broker follower。

controller會監聽其他的Broker的所有資訊,如果controller當機了,在zookeeper上面的那個臨時節點就會消失,此時所有的broker又會一起去Zookeeper上注冊一個臨時節點,因為隻有一個Broker會注冊成功,其他的都會失敗,是以這個成功在Zookeeper上注冊臨時節點的這個Broker會成為新的Controller。

一旦有一個broker當機了,controller會讀取該當機broker上所有的partition在zookeeper上的狀态,并選取ISR清單中的一個replica作為partition leader。

如果ISR清單中的replica全挂,選一個幸存的replica作為leader;

如果該partition的所有的replica都當機了,則将新的leader設定為-1,等待恢複,等待ISR中的任一個Replica恢複,并且選它作為Leader;或選擇第一個活過來的Replica,不一定是ISR中的作為Leader。

broker當機的事情,controller也會通知zookeeper,zookeeper會通知其他的broker。

broker的腦裂問題:

controller在Zookeeper上注冊成功後,它和Zookeeper通信的timeout預設值是6s,也就是如果controller如果有6s中沒有和Zookeeper做心跳,那麼Zookeeper就認為controller已經死了。

就會在Zookeeper上把這個臨時節點删掉,那麼其他broker就會認為controller已經沒了,就會再次搶着注冊臨時節點,注冊成功的broker成為controller。

然後,之前的controller就需要各種shut down去關閉各種節點和事件的監聽。但是當kafka的讀寫流量都非常巨大的時候,這個時候producer進來的message由于Kafka叢集中存在兩個controller而無法落地,導緻資料淤積。

coordinator

Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務。

Group Coordinator的作用是用來存儲Group的相關Meta資訊,并将對應Partition的Offset資訊記錄到Kafka的__consumer_offsets這個topic中。

Kafka在0.9之前是基于Zookeeper來存儲Partition的Offset資訊(consumers/{group}/offsets/{topic}/{partition}),因為ZK并不适用于頻繁的寫操作,是以在0.9之後通過内置Topic的方式來記錄對應Partition的Offset。

kafka的重要配置

boker相關

#在叢集中的唯一辨別broker,非負數
broker.id=1

#broker server服務端口
port=9091

#kafka資料的存放位址,多個位址的話用逗号分割D:\\data11,D:\\data12
log.dirs=D:\\kafkas\\datas\\data1

#ZK叢集的位址,可以是多個,多個之間用逗号分割hostname1:port1,hostname2:port2
zookeeper.connect=localhost:2181

#ZK連接配接逾時時間
zookeeper.connection.timeout.ms=6000

#ZK會話逾時時間
zookeeper.session.timeout.ms=6000

#segment日志的索引檔案大小限制,會被topic建立時的指定參數覆寫
log.index.size.max.bytes =10*1024*1024

#segment的大小,達到指定大小會新建立一個segment檔案,會被topic建立時的指定參數覆寫
log.segment.bytes =1024*1024*1024

# broker接受的消息體的最大大小
message.max.bytes =	1000012

#broker處理消息的最大線程數
num.network.threads=3

#broker處理磁盤IO的線程數
num.io.threads=8

#socket的發送緩沖區
socket.send.buffer.bytes=102400
#socket的接受緩沖區
socket.receive.buffer.bytes=102400
#socket請求的最大數值,message.max.bytes必然要小于socket.request.max.bytes
socket.request.max.bytes=104857600

#topic預設分區個數,會被topic建立時的指定參數覆寫
num.partitions=1

#partition副本數量配置,預設1,表示沒有副本,2表示除了leader還有一個follower
default.replication.factor =1

#是否允許自動建立topic,若是false,就需要手動建立topic
auto.create.topics.enable =true
           

producer(生産者相關)

# 0不管消息是否寫入成功,1隻需要leader寫入消息成功,all需要leader和ISR中的follower都寫入成功
acks = 1

#設定生産者記憶體緩沖區的大小,生産者用它緩沖要發送到伺服器的消息。
#如果應用程式發送消息的速度超過發送到伺服器的速度,會導緻生産者空間不足。這個時候,send()方法調用要麼被阻塞,要麼抛出異常
buffer.memory = 10240

# 當buffer.memory不足,阻塞多久抛出異常
max.block.ms = 3000

# 預設消息發送時不會被壓縮。可設定為snappy、gzip、lz4
compression.type = snappy

# 重試次數
retries = 0

# 重試時間間隔
retry.backoff.ms = 100

# 發向相同partition每個批次的大小,預設16384
batch.size = 10240

# batch.size要産生消息比發送消息快才會出現
# linger.ms可以控制讓發送等n毫秒再發送,以達到批量發送的目的
linger.ms = 0

# 控制生産者每次發送的請求大小,預設1M
max.request.size = 	1048576

# 指定了生産者在收到伺服器響應之前可以發送多少個消息
max.in.flight.requests.per.connection = 1

# tcp緩沖區大小
receive.buffer.bytes = 4096
send.buffer.bytes = 4096
           

snappy壓縮算法占用較少的CPU,有較好的性能和壓縮比

gzip壓縮算法占用較多CPU,但會提供更高的壓縮比

max.in.flight.requests.per.connection導緻消息順序問題,如果:retries>0 && max.in.flight.requests.per.connection >1:

那麼,如果第一個批次消息寫入失敗,而第二個批次寫入成功,會重試寫入第一個批次,如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。

max.in.flight.requests.per.connection=1,即使發生了重試,也可以保證消息是按照發送的順序寫入。

consumer(消費者相關)

# broker伺服器清單
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094

# 消費者每次poll資料時的最大數量
max.poll.records = 500

# 為true則自動送出偏移量
enable.auto.commit = true

# 自動送出偏移量周期(時間間隔)
auto.commit.interval.ms = 5000

# 如果該配置時間内consumer沒有響應Coordinator的心跳檢測,就認為consumer挂了,rebalance
session.timeout.ms = 10000

# Coordinator的心跳檢測周期
heartbeat.interval.ms = 2000

# 當沒有初始偏移量時,怎麼辦,預設latest
# earliest: 自動重置為最早的offset
# latest: 自動重置為最後的offset
# none: 如果在消費者組中沒有前置的offset,抛異常
auto.offset.reset=latest

# 一次最小拉取多少位元組,預設1位元組
fetch.min.bytes=1

# 一次最多拉取多少位元組資料,預設50M
fetch.max.bytes=52428800

# 一次拉取最多等待多少毫秒,預設500
fetch.max.wait.ms=500
           

replica(副本相關)

#leader等待follower的最常時間,超過就將follower移除ISR(in-sync replicas)
replica.lag.time.max.ms =10000

#follower最大落後leader多少條消息,把此replicas遷移到其他follower中,在broker數量較少,或者網絡不足的環境中,建議提高此值
replica.lag.max.messages =4000

#follower與leader之間的socket逾時時間
replica.socket.timeout.ms=30*1000

#leader複制時候的socket緩存大小
replica.socket.receive.buffer.bytes=64*1024

#replicas每次擷取資料的最大大小
replica.fetch.max.bytes =1024*1024

#replicas同leader之間通信的最大等待時間,失敗了會重試
replica.fetch.wait.max.ms =500

#fetch的最小資料尺寸,如果leader中尚未同步的資料小于該值,将會阻塞,直到滿足條件
replica.fetch.min.bytes =1

#leader進行複制的線程數,增大這個數值會增加follower的IO
num.replica.fetchers=1
           

log(日志相關)

#segment檔案大小,會被topic建立時的指定參數覆寫
log.segment.bytes =1024*1024*1024

#segment滾動時間,沒有達到log.segment.bytes也會強制建立一個segment,topic參數覆寫
log.roll.hours =24*7

#日志清理政策選擇有:delete和compact主要針對過期資料的處理
log.cleanup.policy = delete

#資料存儲的最大時間超過這個時間會根據log.cleanup.policy設定的政策處理資料
log.retention.minutes=6000

#topic每個分區大小,一個topic的大小限制=分區數*log.retention.bytes,-1沒有大小
log.retention.bytes=-1
    
#檔案大小檢查的周期時間
log.retention.check.interval.ms=50000
    
#是否開啟日志清理,預設true
log.cleaner.enable=true

#日志清理的線程數
log.cleaner.threads = 2

#日志清理時候處理的最大大小
log.cleaner.io.max.bytes.per.second=None

#日志清理去重時候的緩存空間,在空間允許的情況下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
    
#日志清理時候用到的IO塊大小一般不需要修改
log.cleaner.io.buffer.size=512*1024

#值越大一次清理越多,hash沖突也越嚴重
log.cleaner.io.buffer.load.factor=0.9

#檢查是否有需要清理的日志間隔
log.cleaner.backoff.ms =15000

#日志清理的頻率控制,越大意味着更高效的清理,同時會存在一些空間上的浪費,topic參數覆寫
log.cleaner.min.cleanable.ratio=0.5

#對于壓縮的日志保留的最長時間,會被topic建立時的指定參數覆寫
log.cleaner.delete.retention.ms =100000

#對于segment日志的索引檔案大小限制,會被topic建立時的指定參數覆寫
log.index.size.max.bytes =10*1024*1024

#索引的offset間隔,設定越大,掃描速度越快,但是也更吃記憶體
log.index.interval.bytes =4096

#多少條消息,執行一次重新整理到磁盤操作
log.flush.interval.messages=9223372036854775807

#多少毫秒之後重新整理到磁盤一次,沒有設定使用log.flush.scheduler.interval.ms
log.flush.interval.ms = null

#檢查是否需要重新整理到磁盤的時間間隔
log.flush.scheduler.interval.ms =3000

#檔案在索引中清除後保留的時間一般不需要去修改
log.delete.delay.ms =60000

#控制上次落盤的時間點,以便于資料恢複
log.flush.offset.checkpoint.interval.ms =60000
           

log.cleanup.policy參數控制日志清楚,預設是删除,可以将log.cleanup.policy參數設定為"delete,compact"

這裡的compact并不是壓縮,而是針對每個消息的key進行整合,對于有相同key的的不同value值,隻保留最後一個版本。

注意compact和compression的差別,compact更像是記憶體回收的标記整理,compression才是壓縮的意思,kafka中的壓縮是針對消息内容的。

kafka的删除可以基于3中:

  1. 基于時間
  2. 基于大小
  3. 基于偏移量

log.retention.hours、log.retention.minutes以及log.retention.ms來配置,其中

基于時間的配置,優先級從高到低:

  1. log.retention.ms
  2. log.retention.minutes
  3. log.retention.hours

預設情況下隻配置了log.retention.hours參數,其值為168,故預設情況下日志分段檔案的保留時間為7天。

基于大小的删除通過log.retention.bytes參數控制,預設是-1,沒有大小限制。

log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行删除,會被topic建立時的指定參數覆寫

kafka每次清理日志之後會對segment進行合并操作,合并之後大小不超過log.segments.bytes配置,預設1GB。

controller

#是否允許關閉broker,若是設定為true,會關閉所有在broker上的leader,并轉移到其他broker
controlled.shutdown.enable=false

#控制器關閉的嘗試次數
controlled.shutdown.max.retries=3

#每次關閉嘗試的時間間隔
controlled.shutdown.retry.backoff.ms=5000
    
#partition leader與replicas之間通訊時,socket的逾時時間
controller.socket.timeout.ms =30000

#partition leader與replicas資料同步時,消息的隊列尺寸
controller.message.queue.size=10
           

controlled.shutdown.enable=true主要是為了優雅的關閉:

  1. 可以加速重新啟動
  2. 讓leader更快切換、并且将每個partition不可用的時間降低到幾毫秒

總結

kafka重要概念與叢集重點配置詳解