一. 使用
二. 關鍵配置
三. Storm-kafka使用
本文一、二部分内容主要來自官方文檔。
下載下傳代碼
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
啟動伺服器
kafka依賴zookeeper,是以需要首先安裝并啟動zookeeper。可以使用kafka自帶的zookeeper。
然後即可啟動kafka
建立topic
消息傳輸需要指定topic。是以首先要建立一個topic。
之後,可以看到已經建立的topic.其中的replication-factor指的是複制因子,即log備援的份數,這裡的數字不能大于broker的數量。
也可以不用手動建立topic,隻需要配置broker的時候設定為auto-create topic when a non-existent topic is published to.
發送消息
kafka提供了一個指令行用戶端,可以從一個檔案或者标準輸入裡讀取并發送到kafka叢集。預設的,每一行都作為一個單獨的消息。
在指令行輸入消息并回車即可發送消息。
啟動一個消費者
kafka也提供了一個指令行消費者,接受消息并列印到标準輸出。
設定多broker叢集
首先需要為每一個broker建立一個配置檔案。
然後啟動這兩個結點:
現在一共有了三個結點,三個broker,那麼這樣就可以形成一個叢集。
建立一個複制引子為3的topic
如果想檢視目前這個topic的partion在broker上的分布情況
broker.id: broker的唯一辨別符,叢集環境該值不可重複。
log.dirs: 一個用逗号分隔的目錄清單,可以有多個,用來為Kafka存儲資料。每當需要為一個新的partition配置設定一個目錄時,會選擇目前的存儲partition最少的目錄來存儲。
zookeeper.connect:zookeeper通路位址,多個位址用’,’隔開
message.max.bytes: server能接收的一條消息的最大的大小。這個屬性跟consumer使用的最大fetch大小是一緻的,這很重要,否則一個不守規矩的producer會發送一個太大的消息。預設值:1000000。
metadata.broker.list: kafka的broker清單,格式為host1:port1,host2:port2
request.required.acks:用來控制一個produce請求怎樣才能算完成,準确的說,是有多少broker必須已經送出資料到log檔案,并向leader發送ack,可以設定如下的值:
0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server挂掉的時候會丢失一些資料。
1,意味着在leader replication已經接收到資料後,producer會得到一個ack。這個選項提供了更好的持久性。
-1,意味着在所有的ISR都接收到資料後,producer才得到一個ack。這個選項提供了最好的持久性,隻要還有一個replication存活,那麼資料就不會丢失。
producer.type:決定消息是否應在一個背景線程異步發送。async表示異步發送;sync表示同步發送。設定為async則允許批量發送請求,這回帶來更高的吞吐量,但是client的機器挂了的話會丢失還沒有發送的資料。
serializer.class: 消息的序列化使用的class,如kafka.serializer.StringEncoder
更多細節參見kafka.consumer.ProducerConfig類。
group.id: 唯一的指明了consumer的group的名字,group名一樣的程序屬于同一個consumer group。
zookeeper.connect: 通broker的配置
consumer.id:consumer的唯一辨別符,如果沒有設定的話則自動生成。
fetch.message.max.bytes:每一個擷取某個topic的某個partition的請求,得到最大的位元組數,每一個partition的要被讀取的資料會加載入記憶體,是以這可以幫助控制consumer使用的記憶體。這個值的設定不能小于在server端設定的最大消息的位元組數,否則producer可能會發送大于consumer可以擷取的位元組數限制的消息。預設值:1024 * 1024。
fetch.min.bytes:一個fetch請求最少要傳回多少位元組的資料,如果資料量比這個配置少,則會等待,直到有足夠的資料為止。預設值:1。
fetch.wait.max.ms:在server回應fetch請求前,如果消息不足,就是說小于fetch.min.bytes時,server最多阻塞的時間。如果逾時,消息将立即發送給consumer。預設值:100。
socket.receive.buffer.bytes: socket的receiver buffer的位元組大小。預設值:64 * 1024。
更多細節參見kafka.consumer.ConsumerConfig類。
Kafka很多使用場景是輸出消息到Storm的,Storm本身也提供了storm-kafka的包,在使用Storm的KafkaSpout時需要注意以下幾點:
在采用基于SimpleConsumer的消費端實作時,我們遇到過一個情況是大量的輪詢導緻整個環境網絡的流量異常,原因是該topic一直沒有新消息,consumer端的輪詢沒有設定等待參數,也沒有在client線程裡判斷進行一個短暫的sleep。幾乎是以死循環的方式不斷跟server端通訊,盡管每次的資料包很小,但隻要有幾個這樣的消費端足以引起網絡流量的異常。這裡需要設定maxWait參數,但是此參數必須與minBytes配合使用才有效。但是在storm-kafka的KafkaUtils中的fetchMessages方法中對minBytes沒有設定,是以即使設定了maxWait也沒有效果。這裡需要自己重寫KafkaUtils來解決。
修複了上述問題後,後來還是遇到網絡流量異常的情況,後來在追蹤KafkaSpout源碼的過程中,發現當kafka中的消息過大時,如果不設定合适的bufferSizeBytes以及fetchSizeBytes(至少要大于kafka中最大消息的大小),那麼很容易造成用戶端由于bufferSizeBytes或者fetchSize設定過小,無法将消息放入buffer中也不能成功fetch而不停地去輪詢服務端,進而導緻網絡流量異常。
原文出處:後端技術雜談
<a href="http://www.rowkey.me/blog/2015/05/30/kafka-usage/" target="_blank">原文連結</a>
轉載請與作者聯系,同時請務必标明文章原始出處和原文連結及本聲明。