天天看點

SparkStreaming(源碼閱讀十二)

  要完整去學習spark源碼是一件非常不容易的事情,但是咱可以積少成多嘛~那麼,Spark Streaming是怎麼搞的呢?

  本質上,SparkStreaming接收實時輸入資料流并将它們按批次劃分,然後交給Spark引擎處理生成按照批次劃分的結果流:

  

SparkStreaming(源碼閱讀十二)

  SparkStreaming提供了表示連續資料流的、高度抽象的被稱為離散流的Dstream,可以使用kafka、Flume和Kiness這些資料源的輸入資料流建立Dstream,也可以在其他Dstream上使用map、reduce、join、window等操作建立Dsteram。Dstream本質上呢,是表示RDD的序列。

  Spark Streaming首先将資料切分為一定時間範圍(Duration)的資料集,然後積累一批(Batch)Duration資料集後單獨啟動一個任務線程處理。Spark核心提供的從DAG重新排程任務和并行執行,能夠快速完成資料從故障中恢複的工作。

  那麼下來就從SparkStreaming 的StreamingContext初始化開始:

  StreamingContext傳入的參數:1、SparkContext也就是說Spark Streaming的最終處理實際是交給SparkContext。2、Checkpoint:檢查點.3、Duration:設定streaming每個批次的積累時間。當然,也可以不用設定檢查點。

SparkStreaming(源碼閱讀十二)

  Dstream是Spark Streaming中所有資料流的抽象,這裡對抽象類Dstream定義的一些主要方法:

  1、dependencies:Dstream依賴的父級Dstream清單。

  2、comput(validTime:Time):指定時間生成一個RDD。

  3、isInitialized:Dstream是否已經初始化。

  4、persist(level:StorageLevel):使用指定的存儲級别持久化Dstream的RDD。

  5、persist:存儲到記憶體

  6、cache:緩存到記憶體,與persisit方法一樣。

  (這裡詳細說下cache與persist的不同點:cache隻有一個預設的緩存級别MEMORY_ONLY ,而persist可以根據情況設定其它的緩存級别。)

  7、checkpoint(interval:Duration):設定Dstream及祖宗Dstream的DstreamGraph;

  8、getOrCompute(time:Time):從緩存generatedRDDs = new HashMap[Time,RDD[T]]中擷取RDD,如果緩存不存在,則生成RDD并持久化、設定檢查點放入緩存。

  9、generateJob(time:Time):給指定的Time對象生成Job.

  10、window(windowDuration:Duration):基于原有的Dstream,傳回一個包含了所有在時間滑動視窗中可見元素的新的Dstream.

  ......

  Dsteam本質上是表示連續的一些列的RDD,Dstream中的每個RDD包含了一定間隔的資料,任何對Dstream的操作都會轉化為底層RDD的操作。在Spark Streaming中,Dstream提供的接口與RDD提供的接口非常相似。建構完ReciverInputDStream後,會調用各種Dstream的接口方法,對Dstream進行各種轉換,最後各個Dstream之間的依賴關系就形成了一張DStream Graph:

SparkStreaming(源碼閱讀十二)

  整個流程所涉及的元件為:

  1、Reciever:Spark Streaming内置的輸入流接收器或使用者自定義的接收器,用于從資料源接收源源不斷的資料流。

  2、currentBuffer:用于緩存輸入流接收器接收的資料流。

  3、blockIntervalTimer:一個定時器,用于将CurrentBuffer中緩存的資料流封裝為Block後放入blocksForPushing。

  4、blockForPushing:用于緩存将要使用的Block。

  5、blockPushingThread:此線程每隔100毫秒從blocksForPushing中取出一個Block存入存儲體系,并緩存到ReceivedBlockQueue。

  6、Block Batch:Block批次,按照批次時間間隔,從RecievedBlockQueue中擷取一批Block。

  7、JobGenerator:Job生成器,用于給每一批Blcok生成一個Job。

   下來繼續回到StreamingContext,在StreamingContext中new了一個JobScheduler,它裡面創了EventLoop,對這個名字是不是很熟悉?沒錯,就是在Netty通信互動時建立的對象,主要用于處理JobSchedular的事件。然後啟動StrreamingListenerBus,用于更新Spark UI中的StreamTab的内容。 那麼最重要的就是下來建立ReceiverTracker,它用于處理資料接收、資料緩存、Block生成等工作。最後啟動JobGenerator,負責對DstreamGraph的初始化、Dstream與RDD的轉換、生成JOB、送出執行等工作。

SparkStreaming(源碼閱讀十二)

  曾經是用ReciverTrackerActor接收來自Reciver的消息,包括RegisterReceiver、AddBlock、ReportError、DeregisterReceiver等,現在不再使用Akka進行通信,而是使用RPC。

SparkStreaming(源碼閱讀十二)

  回到launchReceivers,調用了SparkContext的makeRDD方法,将所有Receiver封裝為ParallelCollectionRDD,并行度是receivers的數量,makeRDD方法實際調用了parallelize:

SparkStreaming(源碼閱讀十二)

  今天到此為止。。明天再來會你這磨人的小妖精,玩别的去啦~~~

參考文獻:《深入了解Spark:核心思想與源碼分析》

繼續閱讀