Kafka是由LinkedIn開發的一個分布式的消息系統,使用Scala編寫,它以可水準擴充和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統如Cloudera、Apache Storm、Spark都支援與Kafka內建。InfoQ一直在緊密關注Kafka的應用以及發展,“Kafka剖析”專欄将會從架構設計、實作、應用場景、性能等方面深度解析Kafka。
背景介紹
Kafka建立背景
Kafka是一個消息系統,原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和營運資料處理管道(Pipeline)的基礎。現在它已被多家不同類型的公司 作為多種類型的資料管道和消息系統使用。
活動流資料是幾乎所有站點在對其網站使用情況做報表時都要用到的資料中最正常的部分。活動資料包括頁面通路量(Page View)、被檢視内容方面的資訊以及搜尋情況等内容。這種資料通常的處理方式是先把各種活動以日志的形式寫入某種檔案,然後周期性地對這些檔案進行統計 分析。營運資料指的是伺服器的性能資料(CPU、IO使用率、請求時間、服務日志等等資料)。營運資料的統計方法種類繁多。
Kafka簡介
Kafka是一種分布式的,基于釋出/訂閱的消息系統。主要設計目标如下:
- 以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上消息的傳輸。
- 支援Kafka Server間的消息分區,及分布式消費,同時保證每個Partition内的消息順序傳輸。
- 同時支援離線資料處理和實時資料處理。
- Scale out:支援線上水準擴充。
為何使用消息系統
-
解耦
在項目啟動之初來預測将來項目會碰到什麼需求,是極其困難的。消息系統在處理過程中間插入了一個隐含的、基于資料的接口層,兩邊的處理過程都要實作這一接口。這允許你獨立的擴充或修改兩邊的處理過程,隻要確定它們遵守同樣的接口限制。
-
備援
有些情況下,處理資料的過程會失敗。除非資料被持久化,否則将造成丢失。消息隊列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了數 據丢失風險。許多消息隊列所采用的"插入-擷取-删除"範式中,在把一個消息從隊列中删除之前,需要你的處理系統明确的指出該消息已經被處理完畢,進而确 保你的資料被安全的儲存直到你使用完畢。
-
擴充性
因為消息隊列解耦了你的處理過程,是以增大消息入隊和處理的頻率是很容易的,隻要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴充就像調大電力按鈕一樣簡單。
-
靈活性 & 峰值處理能力
在通路量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值通路為标準來投入資源随時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵元件頂住突發的通路壓力,而不會因為突發的超負荷的請求而完全崩潰。
-
可恢複性
系統的一部分元件失效時,不會影響到整個系統。消息隊列降低了程序間的耦合度,是以即使一個處理消息的程序挂掉,加入隊列中的消息仍然可以在系統恢複後被處理。
-
順序保證
在大多使用場景下,資料處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證資料會按照特定的順序來處理。Kafka保證一個Partition内的消息的有序性。
-
緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖檔比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優化資料流經過系統的速度。
-
異步通信
很多時候,使用者不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許使用者把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。
常用Message Queue對比
-
RabbitMQ
RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支援很多的協定:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更适合于企業級的開發。同時實作了Broker構架,這意味着消息在發送給用戶端時先在中心隊列排隊。對路由,負 載均衡或者資料持久化都有很好的支援。
-
Redis
Redis是一個基于Key-Value對的NoSQL資料庫,開發維護很活躍。雖然它是一個Key-Value資料庫存儲系統,但它本身支援 MQ功能,是以完全可以當做一個輕量級的隊列服務來使用。對于RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行 時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的性能要高于 RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的性能,而RabbitMQ 的出隊性能則遠低于Redis。
-
ZeroMQ
ZeroMQ号稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實作RabbitMQ不擅長的進階/複雜的隊列,但是開發人 員需要自己組合多種技術架構,技術上的複雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運作一個消息服務 器或中間件,因為你的應用程式将扮演這個伺服器角色。你隻需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間 發送消息了。但是ZeroMQ僅提供非持久性的隊列,也就是說如果當機,資料将會丢失。其中,Twitter的Storm 0.9.0以前的版本中預設使用ZeroMQ作為資料流的傳輸(Storm從0.9版本開始同時支援ZeroMQ和Netty作為傳輸子產品)。
-
ActiveMQ
ActiveMQ是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實作隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實作進階應用場景。
-
Kafka/Jafka
Kafka是Apache下的一個子項目,是一個高性能跨語言分布式釋出/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即 Kafka的一個更新版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一台普通的伺服器上既可以達到10W/s的吞 吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支援分布式,自動實作負載均衡;支援Hadoop資料并行加載, 對于像Hadoop的一樣的日志資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的并行加載機制統一了 線上和離線的消息處理。Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。
Kafka架構
Terminology
-
Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
-
Topic
每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)
-
Partition
Parition是實體上的概念,每個Topic包含一個或多個Partition.
-
Producer
負責釋出消息到Kafka broker
-
Consumer
消息消費者,向Kafka broker讀取消息的用戶端。
-
Consumer Group
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于預設的group)。
Kafka拓撲結構
如上圖所示,一個典型的Kafka叢集中包含若幹Producer(可以是web前端産生的Page View,或者是伺服器日志,系統CPU、Memory等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高), 若幹Consumer Group,以及一個Zookeeper集 群。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式将消息釋出到broker,Consumer使用pull模式從 broker訂閱并消費消息。
Topic & Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單了解為必須指明把這條消息放進哪個queue裡。為了 使得Kafka的吞吐率可以線性提高,實體上把Topic分成一個或多個Partition,每個Partition在實體上對應一個檔案夾,該檔案夾下 存儲這個Partition的所有消息和索引檔案。若建立topic1和topic2兩個topic,且分别有13個和19個分區,則整個叢集上會相應會 生成共32個檔案夾(本文所用叢集共8個節點,此處topic1和topic2 replication-factor均為1),如下圖所示。
每個日志檔案都是一個log entrie序列,每個log entrie包含一個4位元組整型數值(值為N+5),1個位元組的"magic value",4個位元組的CRC校驗碼,其後跟N個位元組的消息體。每條消息都有一個目前Partition下唯一的64位元組的offset,它指明了這條 消息的起始位置。磁盤上存儲的消息格式如下:
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes
這個log entries并非由一個檔案構成,而是分成多個segment,每個segment以該segment第一條消息的offset命名并以 “.kafka”為字尾。另外會有一個索引檔案,它标明了每個segment下包含的log entry的offset範圍,如下圖所示。
因為每條消息都被append到該Partition中,屬于順序寫磁盤,是以效率非常高(經驗證,順序寫磁盤效率比随機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證)。
對于傳統的message queue而言,一般會删除已經被消費的消息,而Kafka叢集會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有資料(實際 上也沒必要),是以Kafka提供兩種政策删除舊資料。一是基于時間,二是基于Partition檔案大小。例如可以通過配 置$KAFKA_HOME/config/server.properties,讓Kafka删除一周前的資料,也可在Partition檔案超過1GB 時删除舊資料,配置如下所示。
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與檔案大小無關,是以這裡删除過期檔案與提高Kafka性能無關。選擇怎樣的删 除政策隻與磁盤以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata資訊——目前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下 Consumer會在消費完一條消息後遞增該offset。當然,Consumer也可将offset設成一個較小的值,重新消費一些消息。因為 offet由Consumer控制,是以Kafka broker是無狀态的,它不需要标記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group隻有一個Consumer能消費某一條消息,是以也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
Producer消息路由
Producer發送消息到broker時,會根據Paritition機制選擇将其存儲到哪一個Partition。如果Partition機制 設定合理,所有消息可以均勻分布到不同的Partition裡,這樣就實作了負載均衡。如果一個Topic對應一個檔案,那這個檔案所在的機器I/O将會 成為這個Topic的性能瓶頸,而有了Partition後,不同的消息可以并行寫入不同broker的不同Partition裡,極大的提高了吞吐率。 可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定建立Topic的預設 Partition數量,也可在建立Topic時通過參數指定,同時也可以在Topic建立之後通過Kafka提供的工具修改。
在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該将這條消息發送到哪個 Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實作kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則将對應 的整數與Partition總數取餘,該消息會被發送到該數對應的Partition。(每個Parition都會有個序号,序号從0開始)
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
如果将上例中的類作為partition.class,并通過如下代碼發送20條消息(key分别為0,1,2,3)至topic3(包含4個Partition)。
public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++){
messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
}
producer.send(messageList);
}
producer.close();
}
則key相同的消息會被發送并存儲到同一個partition裡,而且key的序号正好和Partition序号相同。(Partition序号從0開始,本例中的key也從0開始)。下圖所示是通過Java程式調用Consumer後列印出的消息清單。
Consumer Group
(本節所有描述都是基于Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息隻能被同一個Consumer Group内的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實作一個Topic消息的廣播(發給所有的Consumer)和單點傳播(發給某一個Consumer)的手段。一個Topic可以 對應多個Consumer Group。如果需要實作廣播,隻要每個Consumer有一個獨立的Group就可以了。要實作單點傳播隻要所有的Consumer在同一個Group裡。 用Consumer Group還可以将Consumer進行自由的分組而不需要多次發送消息到不同的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時線上處理, 同時使用Hadoop這種批處理系統進行離線處理,還可以同時将資料實時備份到另一個資料中心,隻需要保證這三個操作所使用的Consumer屬于不同的 Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先建立一個Topic (名為topic1,包含3個Partition),然後建立一個屬于group1的Consumer執行個體,并建立三個屬于group2的 Consumer執行個體,最後通過Producer向topic1發送key分别為1,2,3的消息。結果發現屬于group1的Consumer收到了所 有的這三條消息,同時group2中的3個Consumer分别收到了key為1,2,3的消息。如下圖所示。
Push vs. Pull
作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實上,push模式和pull模式各有優劣。
push模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目标是盡可能以最快速度傳遞消息,但是這樣很 容易造成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據Consumer的消費能力以适當的速率消費消 息。
對于Kafka而言,pull模式更合适。pull模式可簡化broker的設計,Consumer可自主要制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的送出方式進而實作不同的傳輸語義。
Kafka delivery guarantee
有這麼幾種可能的delivery guarantee:
- At most once 消息可能會丢,但絕不會重複傳輸
- At least one 消息絕不會丢,但可能會重複傳輸
-
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是使用者所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丢。但是如果 Producer發送資料給broker後,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無 法确定網絡故障期間發生了什麼,但是Producer可以生成一種類似于主鍵的東西,發生故障時幂等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還并未實作,有希望在Kafka未來的版本中實作。(是以目前預設情況下一條消息從 Producer到broker是確定了At least once,可通過設定Producer異步發送實作At most once)。
接下來讨論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息後,可以選擇commit,該操作會在Zookeeper中儲存該Consumer在該 Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取 的開始位置會跟上一次commit之後的開始位置相同。當然可以将Consumer設定為autocommit,即Consumer一旦讀到資料立即自動 commit。如果隻讨論這一讀取消息的過程,那Kafka是確定了Exactly once。但實際使用中應用程式并非在Consumer讀取完資料就結束了,而是要進行進一步處理,而資料處理與commit的順序在很大程度上決定了消 息從broker和consumer的delivery guarantee semantic。
- 讀完消息先commit再處理消息。這種模式下,如果Consumer在commit後還沒來得及處理消息就crash了,下次重新開始工作後就無法讀到剛剛已送出而未處理的消息,這就對應于At most once
- 讀完消息先處理再commit。這種模式下,如果在處理完消息之後commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應于At least once。在很多使用場景下,消息都有一個主鍵,是以消息的處理往往具有幂等性,即多次處理這一條消息跟隻處理一次是等效的,那就可以認為是 Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也并不能完全保證操作的幂等性。而且實際上我們說delivery guarantee 語義是讨論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否幂等性,當成Kafka本身的Feature)
- 如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段送出。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種 方式可能更好,因為許多輸出系統可能不支援兩階段送出。比如,Consumer拿到資料後可能把資料放到HDFS,如果把最新的offset和資料本身一 起寫到HDFS,那就可以保證資料的輸出和offset的更新要麼都完成,要麼都不完成,間接實作Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護的,可以将之存于HDFS中)
總之,Kafka預設保證At least once,并且允許通過設定Producer異步送出來實作At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。
轉載于:https://www.cnblogs.com/yantz/articles/4618086.html