Spark簡介
Apache Spark 是一個大資料處理快速通用引擎,提供了分布式的記憶體抽象RDD(這就是快速計算的原因之一)。
Spark最初由美國加州伯克利大學(UCBerkeley)的AMP(Algorithms, Machines and People)實驗室于2009年開發,是基于記憶體計算的大資料并行計算架構,可用于建構大型的、低延遲的資料分析應用程式。Spark在誕生之初屬于研究性項目,其諸多核心理念均源自學術研究論文。2013年,Spark加入Apache孵化器項目後,開始獲得迅猛的發展,如今已成為Apache軟體基金會最重要的三大分布式計算系統開源項目之一(即Hadoop、Spark、Storm)。
Spark作為大資料計算平台的後起之秀,在2014年打破了Hadoop保持的基準排序(Sort Benchmark)紀錄,使用206個節點在23分鐘的時間裡完成了100TB資料的排序,而Hadoop則是使用2000個節點在72分鐘的時間裡完成同樣資料的排序。也就是說,Spark僅使用了十分之一的計算資源,獲得了比Hadoop快3倍的速度。新紀錄的誕生,使得Spark獲得多方追捧,也表明了Spark可以作為一個更加快速、高效的大資料計算平台。
Spark具有如下幾個主要特點
運作速度快:Spark使用先進的DAG(Directed Acyclic Graph,有向無環圖)執行引擎,以支援循環資料流與記憶體計算,基于記憶體的執行速度可比Hadoop MapReduce快上百倍,基于磁盤的執行速度也能快十倍;
容易使用:Spark支援使用Scala、Java、Python和R語言進行程式設計,簡潔的API設計有助于使用者輕松建構并行程式,幾行代碼就能實作 WordCount。提供了80多種進階操作用與并行app,并且可以通過Spark Shell進行互動式程式設計;
通用性:Spark提供了完整而強大的技術棧,包括SQL查詢、流式計算、機器學習和圖算法元件,這些元件可以無縫整合在同一個應用中,足以應對複雜的計算;也就是說隻要把Spark部署成功,這些融合元件都能夠使用,這就是Spark的一站式解決方案。
運作模式多樣:Spark可運作于獨立的叢集模式中,或者運作于Hadoop中,也可運作于Amazon EC2等雲環境中,并且可以通路HDFS、Cassandra、HBase、Hive等多種資料源。
Spark源碼托管在Github中,截至2016年3月,共有超過800名來自200多家不同公司的開發人員貢獻了15000次代碼送出,可見Spark的受歡迎程度。
此外,每年舉辦的全球Spark頂尖技術人員峰會Spark Summit,吸引了使用Spark的一線技術公司及專家彙聚一堂,共同探讨目前Spark在企業的落地情況及未來Spark的發展方向和挑戰。Spark Summit的參會人數從2014年的不到500人暴漲到2015年的2000多人,足以反映Spark社群的旺盛人氣。
Spark如今已吸引了國内外各大公司的注意,如騰訊、淘寶、百度、亞馬遜等公司均不同程度地使用了Spark來建構大資料分析應用,并應用到實際的生産環境中。相信在将來,Spark會在更多的應用場景中發揮重要作用。
Spark的生态系統
在實際應用中,大資料處理主要包括以下三個類型:
--複雜的批量資料處理:時間跨度通常在數十分鐘到數小時之間;
--基于曆史資料的互動式查詢:時間跨度通常在數十秒到數分鐘之間;
--基于實時資料流的資料處理:時間跨度通常在數百毫秒到數秒之間。
目前已有很多相對成熟的開源軟體用于處理以上三種情景,比如,可以利用Hadoop MapReduce來進行批量資料處理,可以用Impala來進行互動式查詢(Impala與Hive相似,但底層引擎不同,提供了實時互動式SQL查詢),對于流式資料處理可以采用開源流計算架構Storm。一些企業可能隻會涉及其中部分應用場景,隻需部署相應軟體即可滿足業務需求,但是,對于網際網路公司而言,通常會同時存在以上三種場景,就需要同時部署三種不同的軟體,這樣做難免會帶來一些問題:
--不同場景之間輸入輸出資料無法做到無縫共享,通常需要進行資料格式的轉換;
--不同的軟體需要不同的開發和維護團隊,帶來了較高的使用成本;
--比較難以對同一個叢集中的各個系統進行統一的資源協調和配置設定。
Spark的設計遵循“一個軟體棧滿足不同應用場景”的理念,逐漸形成了一套完整的生态系統,既能夠提供記憶體計算架構,也可以支援SparkCore進行批處理、SparkSQL即席查詢、SparkStreaming實時流式計算、機器學習和圖計算等。Spark可以部署在資料總管YARN之上,提供一站式的大資料解決方案。是以,Spark所提供的生态系統足以應對上述三種場景,即同時支援批處理、互動式查詢和流資料處理。
現在,Spark生态系統已經成為伯克利資料分析軟體棧BDAS(Berkeley Data Analytics Stack)的重要組成部分。BDAS的架構如圖所示,從中可以看出,Spark專注于資料的處理分析,而資料的存儲還是要借助于Hadoop分布式檔案系統HDFS、Amazon S3等來實作的。是以,Spark生态系統可以很好地實作與Hadoop生态系統的相容,使得現有Hadoop應用程式可以非常容易地遷移到Spark系統中。
BDAS架構
Spark的生态系統主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等元件,各個元件的具體功能如下:
* Spark Core:Spark Core包含Spark的基本功能,如記憶體計算、任務排程、部署模式、故障恢複、存儲管理等。Spark建立在統一的抽象RDD之上,使其可以以基本一緻的方式應對不同的大資料處理場景;通常所說的Apache Spark,就是指Spark Core;
* Spark SQL:Spark SQL允許開發人員直接處理RDD,同時也可查詢Hive、HBase等外部資料源。Spark SQL的一個重要特點是其能夠統一處理關系表和RDD,使得開發人員可以輕松地使用SQL指令進行查詢,并進行更複雜的資料分析;
* Spark Streaming:Spark Streaming支援高吞吐量、可容錯處理的實時流資料處理,其核心思路是将流式計算分解成一系列短小的批處理作業。Spark Streaming支援多種資料輸入源,如Kafka、Flume和TCP套接字等;
* MLlib(機器學習):MLlib提供了常用機器學習算法的實作,包括聚類、分類、回歸、協同過濾等,降低了機器學習的門檻,開發人員隻要具備一定的理論知識就能進行機器學習的工作;
* GraphX(圖計算):GraphX是Spark中用于圖計算的API,可認為是Pregel在Spark上的重寫及優化,Graphx性能良好,擁有豐富的功能和運算符,能在海量資料上自如地運作複雜的圖算法。
Spark與MapReduce的比較
Hadoop雖然已成為大資料技術的事實标準,但其本身還存在諸多缺陷,最主要的缺陷是其MapReduce計算模型延遲過高,無法勝任實時、快速計算的需求,因而隻适用于離線批處理的應用場景。
回顧Hadoop的工作流程,可以發現Hadoop存在如下一些缺點:
--表達能力有限。計算都必須要轉化成Map和Reduce兩個操作,但這并不适合所有的情況,難以描述複雜的資料處理過程;
-- 磁盤IO開銷大。每次執行時都需要從磁盤讀取資料,并且在計算完成後需要将中間結果寫入到磁盤中,IO開銷較大;
-- 延遲高。一次計算可能需要分解成一系列按順序執行的MapReduce任務,任務之間的銜接由于涉及到IO開銷,會産生較高延遲。而且,在前一個任務執行完成之前,其他任務無法開始,難以勝任複雜、多階段的計算任務。
Spark在借鑒Hadoop MapReduce優點的同時,很好地解決了MapReduce所面臨的問題。相比于MapReduce,
Spark主要具有如下優點:
-- Spark的計算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種資料集操作類型,程式設計模型比MapReduce更靈活;
-- Spark提供了記憶體計算,中間結果直接放到記憶體中,帶來了更高的疊代運算效率;
-- Spark基于DAG的任務排程執行機制,要優于MapReduce的疊代執行機制。
Spark最大的特點就是将計算資料、中間結果都存儲在記憶體中,大大減少了IO開銷,因而,Spark更适合于疊代運算比較多的資料挖掘與機器學習運算。使用Hadoop進行疊代計算非常耗資源,因為每次疊代都需要從磁盤中寫入、讀取中間資料,IO開銷大。而Spark将資料載入記憶體後,之後的疊代計算都可以直接使用記憶體中的中間結果作運算,避免了從磁盤中頻繁讀取資料。
在實際進行開發時,使用Hadoop需要編寫不少相對底層的代碼,不夠高效。相對而言,Spark提供了多種高層次、簡潔的API,通常情況下,對于實作相同功能的應用程式,Spark的代碼量要比Hadoop少2-5倍。更重要的是,Spark提供了實時互動式程式設計回報,可以友善地驗證、調整算法。
盡管Spark相對于Hadoop而言具有較大優勢,但Spark并不能完全替代Hadoop,主要用于替代Hadoop中的MapReduce計算模型。實際上,Spark已經很好地融入了Hadoop生态圈,并成為其中的重要一員,它可以借助于YARN實作資源排程管理,借助于HDFS實作分布式存儲。此外,Hadoop可以使用廉價的、異構的機器來做分布式存儲與計算,但是,Spark對硬體的要求稍高一些,對記憶體與CPU有一定的要求。
總結:
1)原理上:MapReduce是基于磁盤大資料批量處理系統。而Spark是基于RDD(彈性分布資料集)資料處理,顯示的将RDD資料存儲在記憶體中或磁盤上。
2)模型上:MapReduce可以處理超大規模的資料,适合日志的分析和挖掘等。Spark适合資料的挖掘(處理的資料量低于MapReduce);機器學習等多疊代式計算任務。
RDD了解
* l RDD:是彈性分布式資料集(Resilient Distributed Dataset)的簡稱,是分布式記憶體的一個抽象概念,提供了一種高度受限的共享記憶體模型;
* l DAG:是Directed Acyclic Graph(有向無環圖)的簡稱,反映RDD之間的依賴關系;
Spark的核心是建立在統一的抽象RDD之上,使得Spark的各個元件可以無縫進行內建,在同一個應用程式中完成大資料計算任務。RDD的設計理念源自AMP實驗室發表的論文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》。
RDD的設計背景
在實際應用中,存在許多疊代式算法(比如機器學習、圖算法等)和互動式資料挖掘工具,這些應用場景的共同之處是,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。但是,目前的MapReduce架構都是把中間結果寫入到HDFS中,帶來了大量的資料複制、磁盤IO和序列化開銷。雖然,類似Pregel等圖計算架構也是将結果儲存在記憶體當中,但是,這些架構隻能支援一些特定的計算模式,并沒有提供一種通用的資料抽象。RDD就是為了滿足這種需求而出現的,它提供了一個抽象的資料架構,我們不必擔心底層資料的分布式特性,隻需将具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關系,可以實作管道化,進而避免了中間結果的存儲,大大降低了資料複制、磁盤IO和序列化開銷。
一個RDD就是一個分布式對象集合,本質上是一個隻讀的分區記錄集合,每個RDD可以分成多個分區,每個分區就是一個資料集片段,并且一個RDD的不同分區可以被儲存到叢集中不同的節點上,進而可以在叢集中的不同節點上進行并行計算。RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分區的集合,不能直接修改,隻能基于穩定的實體存儲中的資料集來建立RDD,或者通過在其他RDD上執行确定的轉換操作(如map、join和groupBy)而建立得到新的RDD。RDD提供了一組豐富的操作以支援常見的資料運算,分為“行動”(Action)和“轉換”(Transformation)兩種類型,前者用于執行計算并指定輸出的形式,後者指定RDD之間的互相依賴關系。兩類操作的主要差別是,轉換操作(比如map、filter、groupBy、join等)接受RDD并傳回RDD,而行動操作(比如count、collect等)接受RDD但是傳回非RDD(即輸出一個值或結果)。RDD提供的轉換接口都非常簡單,都是類似map、filter、groupBy、join等粗粒度的資料轉換操作,而不是針對某個資料項的細粒度修改。是以,RDD比較适合對于資料集中元素執行相同操作的批處理式應用,而不适合用于需要異步、細粒度狀态的應用,比如Web應用系統、增量式的網頁爬蟲等。正因為這樣,這種粗粒度轉換接口設計,會使人直覺上認為RDD的功能很受限、不夠強大。但是,實際上RDD已經被實踐證明可以很好地應用于許多并行計算應用中,可以具備很多現有計算架構(比如MapReduce、SQL、Pregel等)的表達能力,并且可以應用于這些架構處理不了的互動式資料挖掘應用。
Spark用Scala語言實作了RDD的API,程式員可以通過調用API實作對RDD的各種操作。RDD典型的執行過程如下:
1. RDD讀入外部資料源(或者記憶體中的集合)進行建立;
2. RDD經過一系列的“轉換”操作,每一次都會産生不同的RDD,供給下一個“轉換”使用;
3. 最後一個RDD經“行動”操作進行處理,并輸出到外部資料源(或者變成Scala集合或标量)。
需要說明的是,RDD采用了惰性調用,即在RDD的執行過程中(如圖所示),真正的計算發生在RDD的“行動”操作,對于“行動”之前的所有“轉換”操作,Spark隻是記錄下“轉換”操作應用的一些基礎資料集以及RDD生成的軌迹,即互相之間的依賴關系,而不會觸發真正的計算。
Spark的轉換和行動操作
例如,在圖中,從輸入中邏輯上生成A和C兩個RDD,經過一系列“轉換”操作,邏輯上生成了F(也是一個RDD),之是以說是邏輯上,是因為這時候計算并沒有發生,Spark隻是記錄了RDD之間的生成和依賴關系。當F要進行輸出時,也就是當F進行“行動”操作的時候,Spark才會根據RDD的依賴關系生成DAG,并從起點開始真正的計算。
RDD執行過程的一個執行個體
上述這一系列處理稱為一個“血緣關系(Lineage)”,即DAG拓撲排序的結果。采用惰性調用,通過血緣關系連接配接起來的一系列RDD操作就可以實作管道化(pipeline),避免了多次轉換操作之間資料同步的等待,而且不用擔心有過多的中間資料,因為這些具有血緣關系的操作都管道化了,一個操作得到的結果不需要儲存為中間資料,而是直接管道式地流入到下一個操作進行處理。同時,這種通過血緣關系把一系列操作進行管道化連接配接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多複雜的邏輯。
RDD的5大特性
1)RDD是由一系列的Partition組成。 2)方法(算子)作用在Partititon上 3)RDD之間有依賴關系 4)分區器是作用在K、V格式的RDD 5)Partition提供最佳計算位置
注意:
textFile方法底層封裝的是讀取MR讀取檔案的方式,讀取檔案之前先split,預設split大小是一個block大小。
RDD實際上不存儲資料
什麼是K,V格式的RDD?
如果RDD裡面存儲的資料都是二進制組對象,那麼這個RDD我們就叫做K,V格式的RDD。
哪裡展現RDD的彈性(容錯)?
partition數量,大小沒有限制,展現了RDD的彈性。
RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。
哪裡展現RDD的分布式?
RDD是由Partition組成,partition是分布在不同節點上的。
RDD提供計算最佳位置,展現了資料本地化。展現了大資料中“計算移動資料不移動”的理念。
Spark任務執行原理
以上圖中有四個機器節點,Driver和Worker是啟動在節點上的程序,運作在JVM中的程序。
-- Driver與叢集節點之間有頻繁的通信。
-- Driver負責任務(tasks)的分發和結果的回收。任務的排程。如果task的計算結果非常大就不要回收了。會造成oom。
-- Worker是Standalone資源排程架構裡面資源管理的從節點。也是JVM程序。
-- Master是Standalone資源排程架構裡面資源管理的主節點。也是JVM程序。
架構設計
如圖9-5所示,Spark運作架構包括叢集資料總管(Cluster Manager)、運作作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行程序(Executor)。其中,叢集資料總管可以是Spark自帶的資料總管,也可以是YARN或Mesos等資源管理架構。
與Hadoop MapReduce計算架構相比,Spark所采用的Executor有兩個優點:一是利用多線程來執行具體的任務(Hadoop MapReduce采用的是程序模型),減少任務的啟動開銷;二是Executor中有一個BlockManager存儲子產品,會将記憶體和磁盤共同作為儲存設備,當需要多輪疊代計算時,可以将中間結果存儲到這個存儲子產品裡,下次需要時,就可以直接讀該存儲子產品裡的資料,而不需要讀寫到HDFS等檔案系統裡,因而有效減少了IO開銷;或者在互動式查詢場景下,預先将表緩存到該存儲系統上,進而可以提高讀寫IO性能。
圖9-5 Spark運作架構
總體而言,如圖9-6所示,在Spark中,一個應用(Application)由一個任務控制節點(Driver)和若幹個作業(Job)構成,一個作業由多個階段(Stage)構成,一個階段由多個任務(Task)組成。當執行一個應用時,任務控制節點會向叢集管理器(Cluster Manager)申請資源,啟動Executor,并向Executor發送應用程式代碼和檔案,然後在Executor上執行任務,運作結束後,執行結果會傳回給任務控制節點,或者寫到HDFS或者其他資料庫中。
圖9-6 Spark中各種概念之間的互相關系
Spark運作基本流程
如圖9-7 所示,Spark的基本運作流程如下:
(1)當一個Spark應用被送出時,首先需要為這個應用建構起基本的運作環境,即由任務控制節點(Driver)建立一個SparkContext,由SparkContext負責和資料總管(Cluster Manager)的通信以及進行資源的申請、任務的配置設定和監控等。SparkContext會向資料總管注冊并申請運作Executor的資源;
(2)資料總管為Executor配置設定資源,并啟動Executor程序,Executor運作情況将随着“心跳”發送到資料總管上;
(3)SparkContext根據RDD的依賴關系建構DAG圖,DAG圖送出給DAG排程器(DAGScheduler)進行解析,将DAG圖分解成多個“階段”(每個階段都是一個任務集),并且計算出各個階段之間的依賴關系,然後把一個個“任務集”送出給底層的任務排程器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務排程器将任務分發給Executor運作,同時,SparkContext将應用程式代碼發放給Executor;
(4)任務在Executor上運作,把執行結果回報給任務排程器,然後回報給DAG排程器,運作完畢後寫入資料并釋放所有資源。
圖9-7 Spark運作基本流程圖
總體而言,Spark運作架構具有以下特點:
(1)每個應用都有自己專屬的Executor程序,并且該程序在應用運作期間一直駐留。Executor程序以多線程的方式運作任務,減少了多程序任務頻繁的啟動開銷,使得任務執行變得非常高效和可靠;
(2)Spark運作過程與資料總管無關,隻要能夠擷取Executor程序并保持通信即可;
(3)Executor上有一個BlockManager存儲子產品,類似于鍵值存儲系統(把記憶體和磁盤共同作為儲存設備),在處理疊代計算任務時,不需要把中間結果寫入到HDFS等檔案系統,而是直接放在這個存儲系統上,後續有需要時就可以直接讀取;在互動式查詢場景下,也可以把表提前緩存到這個存儲系統上,提高讀寫IO性能;
(4)任務采用了資料本地性和推測執行等優化機制。資料本地性是盡量将計算移到資料所在的節點上進行,即“計算向資料靠攏”,因為移動計算比移動資料所占的網絡資源要少得多。而且,Spark采用了延時排程機制,可以在更大的程度上實作執行過程優化。比如,擁有資料的節點目前正被其他的任務占用,那麼,在這種情況下是否需要将資料移動到其他的空閑節點呢?答案是不一定。因為,如果經過預測發現目前節點結束目前任務的時間要比移動資料的時間還要少,那麼,排程就會等待,直到目前節點可用。