天天看點

Spark Streaming的優化之路—從Receiver到Direct模式

Spark Streaming的優化之路—從Receiver到Direct模式

作者:個推資料研發工程師 學長

1 業務背景

随着大資料的快速發展,業務場景越來越複雜,離線式的批處理架構MapReduce已經不能滿足業務,大量的場景需要實時的資料處理結果來進行分析、決策。Spark Streaming是一種分布式的大資料實時計算架構,他提供了動态的,高吞吐量的,可容錯的流式資料處理,不僅可以實作使用者行為分析,還能在金融、輿情分析、網絡監控等方面發揮作用。個推開發者服務——消息推送“應景推送”正是應用了Spark Streaming技術,基于大資料分析人群屬性,同時利用LBS地理圍欄技術,實時觸發精準消息推送,實作使用者的精細化營運。此外,個推在應用Spark Streaming做實時處理kafka資料時,采用Direct模式代替Receiver模式的手段,實作了資源優化和程式穩定性提升。

本文将從Spark Streaming擷取kafka資料的兩種模式入手,結合個推實踐,帶你解讀Receiver和Direct模式的原理和特點,以及從Receiver模式到Direct模式的優化對比。

2 兩種模式的原理和差別

Receiver模式

1. Receiver模式下的運作架構

Spark Streaming的優化之路—從Receiver到Direct模式

1)InputDStream: 從流資料源接收的輸入資料。

2)Receiver:負責接收資料流,并将資料寫到本地。

3)Streaming Context:代表SparkStreaming,負責Streaming層面的任務排程,生成jobs發送到Spark engine處理。

4)Spark Context: 代表Spark Core,負責批處理層面的任務排程,真正執行job的Spark engine。

2. Receiver從kafka拉取資料的過程

Spark Streaming的優化之路—從Receiver到Direct模式

該模式下:

1)在executor上會有receiver從kafka接收資料并存儲在Spark executor中,在到了batch時間後觸發job去處理接收到的資料,1個receiver占用1個core;

2)為了不丢資料需要開啟WAL機制,這會将receiver接收到的資料寫一份備份到第三方系統上(如:HDFS);

3)receiver内部使用kafka High Level API去消費資料及自動更新offset。

Direct模式

1. Direct模式下的運作架構

與receiver模式類似,不同在于executor中沒有receiver元件,從kafka拉去資料的方式不同。

2. Direct從kafka拉取資料的過程

Spark Streaming的優化之路—從Receiver到Direct模式

1)沒有receiver,無需額外的core用于不停地接收資料,而是定期查詢kafka中的每個partition的最新的offset,每個批次拉取上次處理的offset和目前查詢的offset的範圍的資料進行處理;

2)為了不丢資料,無需将資料備份落地,而隻需要手動儲存offset即可;

3)内部使用kafka simple Level API去消費資料, 需要手動維護offset,kafka zk上不會自動更新offset。

Receiver與Direct模式的差別

1.前者在executor中有Receiver接受資料,并且1個Receiver占用一個core;而後者無Receiver,是以不會暫用core;

2.前者InputDStream的分區是 num_receiver *batchInterval/blockInteral,後者的分區數是kafka topic partition的數量。Receiver模式下num_receiver的設定不合理會影響性能或造成資源浪費;如果設定太小,并行度不夠,整個鍊路上接收資料将是瓶頸;如果設定太多,則會浪費資源;

3.前者使用zookeeper來維護consumer的偏移量,而後者需要自己維護偏移量;

4.為了保證不丢失資料,前者需要開啟WAL機制,而後者不需要,隻需要在程式中成功消費完資料後再更新偏移量即可。

3 Receiver改造成Direct模式

個推使用Spark Streaming做實時處理kafka資料,先前使用的是receiver模式;

receiver有以下特點:

1.receiver模式下,每個receiver需要單獨占用一個core;

2.為了保證不丢失資料,需要開啟WAL機制,使用checkpoint儲存狀态;

3.當receiver接受資料速率大于處理資料速率,導緻資料積壓,最終可能會導緻程式挂掉。

由于以上特點,receiver模式下會造成一定的資源浪費;使用checkpoint儲存狀态, 如果需要更新程式,則會導緻checkpoint無法使用;第3點receiver模式下會導緻程式不太穩定;并且如果設定receiver數量不合理也會造成性能瓶頸在receiver。為了優化資源和程式穩定性,應将receiver模式改造成direct模式。

修改方式如下:

1. 修改InputDStream的建立

将receiver的:

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])           

改成direct的:

val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])           

2. 手動維護offset

receiver模式代碼:

(receiver模式不需要手動維護offset,而是内部通過kafka consumer high level API 送出到kafka/zk儲存)

kafkaStream.map {
           ...
 }.foreachRDD { rdd =>
    // 資料處理
    doCompute(rdd)
 }           

direct模式代碼:

directKafkaStream.map {
           ...
 }.foreachRDD { rdd =>
    // 擷取目前rdd資料對應的offset
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 資料處理
    doCompute(rdd)
    // 自己實作儲存offset
    commitOffsets(offsetRanges)
 }           

4 其他優化點

1. 在receiver模式下:

1)拆分InputDStream,增加Receiver,進而增加接收資料的并行度;

2)調整blockInterval,适當減小,增加task數量,進而增加并行度(在core的數量>task數量的情況下);

3)如果開啟了WAL機制,資料的存儲級别設定為MOMERY_AND_DISK_SER。

2.資料序列化使用Kryoserializationl,相比Java serializationl 更快,序列化後的資料更小;

3.建議使用CMS垃圾回收器降低GC開銷;

4.選擇高性能的算子(mapPartitions, foreachPartitions, aggregateByKey等);

5.repartition的使用:在streaming程式中因為batch時間特别短,是以資料量一般較小,是以repartition的時間短,可以解決一些因為topicpartition中資料配置設定不均勻導緻的資料傾斜問題;

6.因為SparkStreaming生産的job最終都是在sparkcore上運作的,是以sparkCore的優化也很重要;

7.BackPressure流控

1)為什麼引入Backpressure?

當batch processing time>batchinterval 這種情況持續過長的時間,會造成資料在記憶體中堆積,導緻Receiver所在Executor記憶體溢出等問題;

2)Backpressure:根據JobScheduler回報作業的執行資訊來動态調整資料接收率;

3)配置使用:

spark.streaming.backpressure.enabled
含義: 是否啟用 SparkStreaming内部的backpressure機制,
預設值:false ,表示禁用

spark.streaming.backpressure.initialRate
含義: receiver 為第一個batch接收資料時的比率

spark.streaming.receiver.maxRate
含義: receiver接收資料的最大比率,如果設定值<=0, 則receiver接收資料比率不受限制

spark.streaming.kafka.maxRatePerPartition
含義: 從每個kafka partition中讀取資料的最大比率           

8.speculation機制

spark内置speculation機制,推測job中的運作特别慢的task,将這些task kill,并重新排程這些task執行。

預設speculation機制是關閉的,通過以下配置參數開啟:

spark.speculation=true           

注意:在有些情況下,開啟speculation反而效果不好,比如:streaming程式消費多個topic時,從kafka讀取資料直接處理,沒有重新分區,這時如果多個topic的partition的資料量相差較大那麼可能會導緻正常執行更大資料量的task會被認為執行緩慢,而被中途kill掉,這種情況下可能導緻batch的處理時間反而變長;可以通過repartition來解決這個問題,但是要衡量repartition的時間;而在streaming程式中因為batch時間特别短,是以資料量一般較小,是以repartition的時間短,不像spark_batch一次處理大量資料一旦repartition則會特别久,是以最終還是要根據具體情況測試來決定。

5 總結

将Receiver模式改成Direct模式,實作了資源優化,提升了程式的穩定性,缺點是需要自己管理offset,操作相對複雜。未來,個推将不斷探索和優化Spark Streaming技術,發揮其強大的資料處理能力,為建設實時數倉提供保障。

繼續閱讀