這是之前學習時候寫的筆記,貼一下便于日常查閱。。。
官方文檔:http://kafka.apache.org/quickstart
https://engineering.linkedin.com/kafka/
搭建參考: https://www.cnblogs.com/luotianshuai/p/5206662.html
理論資料參考 http://www.jasongj.com/tags/Kafka
環境:
CentOS7.3
kafka_2.11-1.0.0
zookeeper-3.4.9
3台主機:192.168.5.71、192.168.5.、192.168.5.73
編輯3台主機vim /etc/hosts 加上如下的3行:
192.168.5.71 node71
192.168.5.72 node72
192.168.5.73 node73
tar xf zookeeper-3.4.9.tar.gz -C /opt/
cd /opt
ln -s zookeeper-3.4.9 zk
cd zk/conf
vim zoo.cfg 内容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk
dataLogDir=/opt/zk
clientPort=2181
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=60000
autopurge.purgeInterval=24
autopurge.snapRetainCount=5
quorum.auth.enableSasl=false
quorum.cnxn.threads.size=20
server.1=node71:3181:4181
server.2=node72:3181:4181
server.3=node73:3181:4181
在3個節點分别建立myid,并啟動zkserver:
node71上: echo '1' > /opt/zk/myid
node72上: echo '2' > /opt/zk/myid
node73上: echo '3' > /opt/zk/myid
在3個節點分别啟動zkserver:
/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg
在3個節點觀察叢集主機的狀态:
/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg
tar xf kafka_2.11-1.0.0.tgz -C /opt/
cd /opt/
ln -s kafka_2.11-1.0.0 kafka
每個主機的配置如下:
[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties
broker.id=0 #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
listeners=PLAINTEXT://:9092 #目前kafka對外提供服務的端口預設是9092
num.network.threads=3 # 這個是borker進行網絡處理的線程數
num.io.threads=8 # 這個是borker進行I/O處理的線程數
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,資料不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當資料到達一定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
log.dirs=/tmp/kafka-logs
#消息存放的目錄,這個目錄可以配置為“,”逗号分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新建立的topic他把消息持久化的地方是,目前以逗号分割的目錄中,那個分區數最少就放那一個
num.partitions=1 # 預設的分區數,一個topic預設1個分區數
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 # 預設消息的最大持久化時間,168小時,7天
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(預設168 ),到目錄檢視是否有過期的消息如果有,删除
message.max.byte=5242880 #消息儲存的最大值5M
default.replication.factor=2 #kafka儲存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
zookeeper.connect=node71:2181,node72:2181,node73:2181 #設定zookeeper的連接配接端口
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
zookeeper.connect=node71:2181,node72:2181,node73:2181
[root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties
broker.id=2
在3台機器上都執行啟動Kafka叢集:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
1、建立Topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 3 --topic mysql-order
/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 2 --topic mysql-coupons
#參數含義
--replication-factor 2 #複制兩份
--partitions 2 #建立2個分區
--topic #主題
2、在一台伺服器上建立一個釋出者broker
/opt/kafka/bin/kafka-console-producer.sh --broker-list node72:9092 --topic mysql-order
然後,可以輸入一些文字内容
3、在一台伺服器上建立一個訂閱者consumer
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node72:2181 --from-beginning --topic mysql-order
4、列出全部的topic
/opt/kafka/bin/kafka-topics.sh --list --zookeeper node72:2181
5、檢視指定的topic狀态
[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper node72:2181 --topic mysql-coupons
cd /etc/zk/bin
./zkCli.sh -server node71:2181
ls /brokers
get /brokers/ids/2
get /brokers/topics/mysql-coupons/partitions/0 # 這裡就不截圖了
官方提到的關閉方式就是kill -9
ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9
Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
Topic
每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為topic。(實體上不同topic的消息分開存儲,邏輯上一個topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的topic即可生産或消費資料而不必關心資料存于何處)
Partition
parition是實體上的概念,每個topic包含一個或多個partition,建立topic時可指定parition數量。每個partition對應于一個檔案夾,該檔案夾下存儲該partition的資料和索引檔案
Producer
負責釋出消息到Kafka broker (push消息到broker)
Consumer
消費消息(從broker那裡poll消息)。每個consumer屬于一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬于預設的group)。使用consumer high level API時,同一topic的一條消息隻能被同一個consumer group内的一個consumer消費,但多個consumer group可同時消費這一消息。
如上圖所示,一個典型的kafka叢集中包含若幹producer(可以是web前端産生的page view,或者是伺服器日志,系統CPU、memory等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高),若幹consumer group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式将消息釋出到broker,consumer使用pull模式從broker訂閱并消費消息。
作為一個messaging system,Kafka遵循了傳統的方式,選擇由producer向broker push消息并由consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事實上,push模式和pull模式各有優劣。
push模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。
而pull模式則可以根據consumer的消費能力以适當的速率消費消息。
本節所有描述都是基于consumer hight level API而非low level API
每一個consumer執行個體都屬于一個consumer group,每一條消息隻會被同一個consumer group裡的一個consumer執行個體消費(這是為了實作傳統message queue消息隻被消費一次的語義),但是不同consumer group可以同時消費同一條消息,這一特性可以為消息的多元化處理提供了支援。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時将資料實時備份到另一個資料中心,隻需要保證這三個操作所使用的consumer在不同的consumer group即可。
Linked的一種kafka部署方案:
對于傳統的message queue而言,一般會删除已經被消費的消息,而Kafka叢集會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有資料(實際上也沒必要),是以Kafka提供兩種政策去删除舊資料。一是基于時間,二是基于partition檔案大小。例如可以通過配置server.properties,讓Kafka删除一周前的資料,也可通過配置讓Kafka在partition檔案超過1GB時删除舊資料
log.segment.bytes=1073741824 # partition大小超過1G時候,清理舊資料
這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與檔案大小無關,是以這裡删除檔案與Kafka性能無關,選擇怎樣的删除政策隻與磁盤以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata資訊—目前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息後線性增加這個offset。當然,consumer也可将offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,是以Kafka broker是無狀态的,它不需要标記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group隻有一個consumer能消費某一條消息,是以也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
本節所講述内容均基于Kafka consumer high level API
具體參考:http://developer.51cto.com/art/201501/464491.htm
Kafka從0.8開始提供partition級别的replication,replication的數量可在server.properties中配置。
kafka預設default.replication.factor = 2
該Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有一定影響的,但極大的增強了可用性。每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成,consumer批量從leader上pull資料。一般情況下partition的數量大于等于broker的數量,并且所有partition的leader均勻分布在broker上。follower上的日志和其leader上的完全一樣。
和大部分分布式系統一樣,Kakfa處理失敗需要明确定義一個broker是否alive。
對于Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實作)。二是follower必須能夠及時将leader的writing複制過來,不能落後太多。
leader會track "in sync"的node list。如果一個follower當機,或者落後太多,leader将把它從"in sync" list中移除。這裡所描述的"落後太多" 指follower複制的消息落後于leader後的條數超過預定值,該值可在server.properties中配置。
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
需要說明的是,Kafka隻解決”fail/recover”,不處理“Byzantine”(“拜占庭”)問題。
個人覺得,"in sync" list 機制,類似于MySQL中的semi-sync半同步。
一條消息隻有被"in sync" list裡的所有follower都從leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了leader,還沒來得及被任何follower複制就當機了,而造成資料丢失(consumer無法消費這些資料)。而對于producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設定。這種機制確定了隻要"in sync" list有一個或以上的flollower,一條被commit的消息就不會丢失。
這裡的複制機制即不是同步複制,也不是單純的異步複制。事實上,同步複制要求“活着的”follower都複制完,這條消息才會被認為commit,這種複制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複制方式下,follower異步的從leader複制資料,資料隻要被leader寫入log就被認為已經commit,這種情況下如果follwer都落後于leader,而leader突然當機,則會丢失資料。而Kafka的這種使用"in sync" list的方式則很好的均衡了確定資料不丢失以及吞吐率。follower可以批量的從leader複制資料,這樣極大的提高複制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說到,隻要follower落後leader不太遠,則被認為在"in sync" list裡)。
首先Kafka會将接收到的消息分區(partition),每個主題(topic)的消息有不同的分區。這樣一方面消息的存儲就不會受到單一伺服器存儲空間大小的限制,另一方面消息的處理也可以在多個伺服器上并行。
其次為了保證高可用,每個分區都會有一定數量的副本(replica)。這樣如果有部分伺服器不可用,副本所在的伺服器就會接替上來,保證應用的持續性。
但是,為了保證較高的處理效率,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其他副本則是Follower。而Follower則會定期地到Leader上同步資料。
Leader選舉
如果某個分區所在的伺服器除了問題,不可用,kafka會從該分區的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。顯然,隻有那些跟Leader保持同步的Follower才應該被選作新的Leader。
Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分區的副本。隻有當這些副本都跟Leader中的副本同步了之後,kafka才會認為消息已送出,并回報給消息的生産者。如果這個集合有增減,kafka會更新zookeeper上的記錄。
如果某個分區的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。
顯然通過ISR,kafka需要的備援度較低,可以容忍的失敗數比較高。假設某個topic有f+1個副本,kafka可以容忍f個伺服器不可用。
為什麼不用少數服從多數的方法:
少數服從多數是一種比較常見的一緻性算法和Leader選舉法。它的含義是隻有超過半數的副本同步了,系統才會認為資料已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種算法需要較高的備援度。譬如隻允許一台機器失敗,需要有三個副本;而如果隻容忍兩台機器失敗,則需要五個副本。而kafka的ISR集合方法,分别隻需要兩個和三個副本。
如果所有的ISR副本都失敗了怎麼辦:
此時有兩種方法可選,一種是等待ISR集合中的副本複活,一種是選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。這兩種方法各有利弊,實際生産中按需選擇。
如果要等待ISR副本複活,雖然可以保證一緻性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本并不一緻。
例如下圖:
1、zabbix監控
2、prometheus監控
本文轉自 lirulei90 51CTO部落格,原文連結:http://blog.51cto.com/lee90/2067533,如需轉載請自行聯系原作者