天天看點

kafka學習筆記

這是之前學習時候寫的筆記,貼一下便于日常查閱。。。

官方文檔:http://kafka.apache.org/quickstart

https://engineering.linkedin.com/kafka/

搭建參考: https://www.cnblogs.com/luotianshuai/p/5206662.html

理論資料參考 http://www.jasongj.com/tags/Kafka

環境:

CentOS7.3

kafka_2.11-1.0.0

zookeeper-3.4.9

3台主機:192.168.5.71、192.168.5.、192.168.5.73

編輯3台主機vim /etc/hosts 加上如下的3行:

192.168.5.71  node71

192.168.5.72  node72

192.168.5.73  node73

tar xf zookeeper-3.4.9.tar.gz -C /opt/

cd /opt

ln -s zookeeper-3.4.9 zk

cd zk/conf

vim zoo.cfg  内容如下:

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/opt/zk

dataLogDir=/opt/zk

clientPort=2181

maxClientCnxns=60

minSessionTimeout=4000

maxSessionTimeout=60000

autopurge.purgeInterval=24

autopurge.snapRetainCount=5

quorum.auth.enableSasl=false

quorum.cnxn.threads.size=20

server.1=node71:3181:4181

server.2=node72:3181:4181

server.3=node73:3181:4181

在3個節點分别建立myid,并啟動zkserver:

node71上: echo '1' > /opt/zk/myid

node72上: echo '2' > /opt/zk/myid

node73上: echo '3' > /opt/zk/myid

在3個節點分别啟動zkserver:

/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg

在3個節點觀察叢集主機的狀态:

/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg

tar xf kafka_2.11-1.0.0.tgz -C /opt/

cd /opt/

ln -s kafka_2.11-1.0.0 kafka

每個主機的配置如下:

[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties

broker.id=0      #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣

listeners=PLAINTEXT://:9092        #目前kafka對外提供服務的端口預設是9092

num.network.threads=3        # 這個是borker進行網絡處理的線程數

num.io.threads=8              # 這個是borker進行I/O處理的線程數

socket.send.buffer.bytes=102400    #發送緩沖區buffer大小,資料不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能

socket.receive.buffer.bytes=102400    #kafka接收緩沖區大小,當資料到達一定大小後在序列化到磁盤

socket.request.max.bytes=104857600   #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小

log.dirs=/tmp/kafka-logs

#消息存放的目錄,這個目錄可以配置為“,”逗号分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新建立的topic他把消息持久化的地方是,目前以逗号分割的目錄中,那個分區數最少就放那一個

num.partitions=1      # 預設的分區數,一個topic預設1個分區數

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168        # 預設消息的最大持久化時間,168小時,7天

log.segment.bytes=1073741824  

log.retention.check.interval.ms=300000  #每隔300000毫秒去檢查上面配置的log失效時間(預設168 ),到目錄檢視是否有過期的消息如果有,删除

message.max.byte=5242880        #消息儲存的最大值5M

default.replication.factor=2    #kafka儲存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務

replica.fetch.max.bytes=5242880  #取消息的最大直接數

zookeeper.connect=node71:2181,node72:2181,node73:2181  #設定zookeeper的連接配接端口

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

[root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties

broker.id=1

listeners=PLAINTEXT://:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

num.partitions=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

zookeeper.connect=node71:2181,node72:2181,node73:2181

[root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties

broker.id=2

在3台機器上都執行啟動Kafka叢集:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

1、建立Topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 3 --topic mysql-order

/opt/kafka/bin/kafka-topics.sh --create --zookeeper node71:2181 --replication-factor 2 --partitions 2 --topic mysql-coupons

#參數含義

--replication-factor 2   #複制兩份

--partitions 2    #建立2個分區

--topic #主題

2、在一台伺服器上建立一個釋出者broker

/opt/kafka/bin/kafka-console-producer.sh --broker-list node72:9092 --topic mysql-order

然後,可以輸入一些文字内容

3、在一台伺服器上建立一個訂閱者consumer

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node72:2181 --from-beginning --topic mysql-order

4、列出全部的topic

/opt/kafka/bin/kafka-topics.sh --list --zookeeper node72:2181

5、檢視指定的topic狀态

[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper node72:2181 --topic mysql-coupons

cd /etc/zk/bin

./zkCli.sh -server node71:2181

ls /brokers

get /brokers/ids/2

get /brokers/topics/mysql-coupons/partitions/0    # 這裡就不截圖了

官方提到的關閉方式就是kill -9

ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9

Broker

Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker

Topic

每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為topic。(實體上不同topic的消息分開存儲,邏輯上一個topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的topic即可生産或消費資料而不必關心資料存于何處)

Partition

parition是實體上的概念,每個topic包含一個或多個partition,建立topic時可指定parition數量。每個partition對應于一個檔案夾,該檔案夾下存儲該partition的資料和索引檔案

Producer

負責釋出消息到Kafka broker (push消息到broker)

Consumer

消費消息(從broker那裡poll消息)。每個consumer屬于一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬于預設的group)。使用consumer high level API時,同一topic的一條消息隻能被同一個consumer group内的一個consumer消費,但多個consumer group可同時消費這一消息。

如上圖所示,一個典型的kafka叢集中包含若幹producer(可以是web前端産生的page view,或者是伺服器日志,系統CPU、memory等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高),若幹consumer group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式将消息釋出到broker,consumer使用pull模式從broker訂閱并消費消息。

作為一個messaging system,Kafka遵循了傳統的方式,選擇由producer向broker push消息并由consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式。事實上,push模式和pull模式各有優劣。

push模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。

而pull模式則可以根據consumer的消費能力以适當的速率消費消息。

本節所有描述都是基于consumer hight level API而非low level API

每一個consumer執行個體都屬于一個consumer group,每一條消息隻會被同一個consumer group裡的一個consumer執行個體消費(這是為了實作傳統message queue消息隻被消費一次的語義),但是不同consumer group可以同時消費同一條消息,這一特性可以為消息的多元化處理提供了支援。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時将資料實時備份到另一個資料中心,隻需要保證這三個操作所使用的consumer在不同的consumer group即可。

Linked的一種kafka部署方案:

對于傳統的message queue而言,一般會删除已經被消費的消息,而Kafka叢集會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有資料(實際上也沒必要),是以Kafka提供兩種政策去删除舊資料。一是基于時間,二是基于partition檔案大小。例如可以通過配置server.properties,讓Kafka删除一周前的資料,也可通過配置讓Kafka在partition檔案超過1GB時删除舊資料

log.segment.bytes=1073741824     # partition大小超過1G時候,清理舊資料

這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與檔案大小無關,是以這裡删除檔案與Kafka性能無關,選擇怎樣的删除政策隻與磁盤以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata資訊—目前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息後線性增加這個offset。當然,consumer也可将offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,是以Kafka broker是無狀态的,它不需要标記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group隻有一個consumer能消費某一條消息,是以也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。

本節所講述内容均基于Kafka consumer high level API

具體參考:http://developer.51cto.com/art/201501/464491.htm

Kafka從0.8開始提供partition級别的replication,replication的數量可在server.properties中配置。

kafka預設default.replication.factor = 2

該Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有一定影響的,但極大的增強了可用性。每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成,consumer批量從leader上pull資料。一般情況下partition的數量大于等于broker的數量,并且所有partition的leader均勻分布在broker上。follower上的日志和其leader上的完全一樣。

和大部分分布式系統一樣,Kakfa處理失敗需要明确定義一個broker是否alive。

對于Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實作)。二是follower必須能夠及時将leader的writing複制過來,不能落後太多。

leader會track "in sync"的node list。如果一個follower當機,或者落後太多,leader将把它從"in sync" list中移除。這裡所描述的"落後太多" 指follower複制的消息落後于leader後的條數超過預定值,該值可在server.properties中配置。

replica.lag.max.messages=4000

replica.lag.time.max.ms=10000

需要說明的是,Kafka隻解決”fail/recover”,不處理“Byzantine”(“拜占庭”)問題。

個人覺得,"in sync" list 機制,類似于MySQL中的semi-sync半同步。

一條消息隻有被"in sync" list裡的所有follower都從leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了leader,還沒來得及被任何follower複制就當機了,而造成資料丢失(consumer無法消費這些資料)。而對于producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設定。這種機制確定了隻要"in sync" list有一個或以上的flollower,一條被commit的消息就不會丢失。

這裡的複制機制即不是同步複制,也不是單純的異步複制。事實上,同步複制要求“活着的”follower都複制完,這條消息才會被認為commit,這種複制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複制方式下,follower異步的從leader複制資料,資料隻要被leader寫入log就被認為已經commit,這種情況下如果follwer都落後于leader,而leader突然當機,則會丢失資料。而Kafka的這種使用"in sync" list的方式則很好的均衡了確定資料不丢失以及吞吐率。follower可以批量的從leader複制資料,這樣極大的提高複制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說到,隻要follower落後leader不太遠,則被認為在"in sync" list裡)。

首先Kafka會将接收到的消息分區(partition),每個主題(topic)的消息有不同的分區。這樣一方面消息的存儲就不會受到單一伺服器存儲空間大小的限制,另一方面消息的處理也可以在多個伺服器上并行。

其次為了保證高可用,每個分區都會有一定數量的副本(replica)。這樣如果有部分伺服器不可用,副本所在的伺服器就會接替上來,保證應用的持續性。 

但是,為了保證較高的處理效率,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其他副本則是Follower。而Follower則會定期地到Leader上同步資料。

Leader選舉

如果某個分區所在的伺服器除了問題,不可用,kafka會從該分區的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。顯然,隻有那些跟Leader保持同步的Follower才應該被選作新的Leader。

Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分區的副本。隻有當這些副本都跟Leader中的副本同步了之後,kafka才會認為消息已送出,并回報給消息的生産者。如果這個集合有增減,kafka會更新zookeeper上的記錄。

如果某個分區的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。

顯然通過ISR,kafka需要的備援度較低,可以容忍的失敗數比較高。假設某個topic有f+1個副本,kafka可以容忍f個伺服器不可用。

為什麼不用少數服從多數的方法:

  少數服從多數是一種比較常見的一緻性算法和Leader選舉法。它的含義是隻有超過半數的副本同步了,系統才會認為資料已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種算法需要較高的備援度。譬如隻允許一台機器失敗,需要有三個副本;而如果隻容忍兩台機器失敗,則需要五個副本。而kafka的ISR集合方法,分别隻需要兩個和三個副本。

如果所有的ISR副本都失敗了怎麼辦:

  此時有兩種方法可選,一種是等待ISR集合中的副本複活,一種是選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。這兩種方法各有利弊,實際生産中按需選擇。

  如果要等待ISR副本複活,雖然可以保證一緻性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本并不一緻。

例如下圖:

1、zabbix監控

2、prometheus監控

本文轉自 lirulei90 51CTO部落格,原文連結:http://blog.51cto.com/lee90/2067533,如需轉載請自行聯系原作者