3.3 應用
下面會介紹spark/scala中的一些實際示例和庫,具體會從一個非常經典的單詞計數問題開始。
3.3.1 單詞計數
大多數現代機器學習算法需要多次傳遞資料。如果資料能存放在單台機器的記憶體中,則該資料會容易獲得,并且不會呈現性能瓶頸。如果資料太大,單台機器的記憶體容納不下,則可儲存在磁盤(或資料庫)上,這樣雖然可得到更大的存儲空間,但存取速度大約會降為原來的1/100。另外還有一種方式就是分割資料集,将其存儲在網絡中的多台機器上,并通過網絡來傳輸結果。雖然對這種方式仍有争議,但分析表明,對于大多數實際系統而言,如果能有效地在多個cpu之間拆分工作負載,則通過一組網絡連接配接節點存儲資料比從單個節點上的硬碟重複存儲和讀取資料略有優勢。
磁盤的平均帶寬約為100 mb/s,由于磁盤的轉速和緩存不同,其傳輸時會有幾毫秒的延遲。相對于直接從記憶體中讀取資料,速度要降為原來的1/100左右,當然,這也會取決于資料大小和緩存的實作。現代資料總線可以超過10 gb/s的速度傳輸資料。而網絡速度仍然落後于直接的記憶體通路,特别是标準網絡層中tcp/ip核心的開銷會對網絡速度影響很大。但專用硬體可以達到每秒幾十吉位元組,如果并行運作,則可能和從記憶體讀取一樣快。目前的網絡傳輸速度介于1~10 gb/s之間,但在實際應用中仍然比磁盤更快。是以,可以将資料配置設定到叢集節點中所有機器的記憶體中,并在叢集上執行疊代機器學習算法。
但記憶體也有一個問題:在節點出現故障并重新啟動後,記憶體中的資料不會跨節點持久儲存。一個流行的大資料架構hadoop解決了這個問題。hadoop受益于dean/ghemawat的論文(jeff dean和sanjay ghemawat, mapreduce: simplified data processing on large clusters, osdi, 2004.),這篇文章提出使用磁盤層持久性來保證容錯和存儲中間結果。hadoop mapreduce程式首先會在資料集的每一行上運作map函數,得到一個或多個鍵/值對。然後按鍵值對這些鍵/值對進行排序、分組和聚合,使得具有相同鍵的記錄最終會在同一個reducer上處理,該reducer可能在一個(或多個)節點上運作。reducer會使用一個reduce函數,周遊同一個鍵對應的所有值,并将它們聚合在一起。如果reducer因為一些原因失敗,由于其中間結果持久儲存,則可以丢棄部分計算,然後可從檢查點儲存的結果重新開始reduce計算。很多簡單的類etl應用程式僅在保留非常少的狀态資訊的情況下才周遊資料集,這些狀态資訊是從一個記錄到另一個記錄的。
單詞計數是mapreduce的經典應用程式。該程式可統計文檔中每個單詞的出現次數。在scala中,對排好序的單詞清單采用foldleft方法,很容易得到單詞計數。
如果運作這個程式,會輸出(字,計數)這樣的元組清單。該程式會按行來分詞,并對得到的單詞排序,然後将每個單詞與(字,計數)元組清單中的最新條目(entry)進行比對。同樣的計算在mapreduce中會表示成如下形式:
首先需要按行處理文本,将行拆分成單詞,并生成(word,1)對。這個任務很容易并行化。為了并行化全局計數,需對計數部分進行劃分,具體的分解通過對單詞子集配置設定計數任務來實作。在hadoop中需計算單詞的哈希值,并根據哈希值來劃分工作。
一旦map任務找到給定哈希的所有條目,它就可以将鍵/值對發送到reducer,在mapreduce中,發送部分通常稱為shuffle。從所有mapper中接收完所有的鍵/值對後,reducer才會組合這些值(如果可能,在mapper中也可部分組合這些值),并對整個聚合進行計算,在這種情況下隻進行求和。單個reducer将檢視給定單詞的所有值。
下面介紹spark中單詞計數程式的日志輸出(spark在預設情況下輸出的日志會非常冗長,為了輸出關鍵的日志資訊,可将conf /log4j.properties檔案中的info替換為error或fatal):
這個過程發生的唯一的事情是中繼資料操作,spark不會觸及資料本身,它會估計資料集的大小和分區數。預設情況下是hdfs塊數,但是可使用minpartitions參數明确指定最小分區數:
下面定義另一個rdd,它源于linesrdd:
在2 gb的文本資料(共有40 291行,353 087個單詞)上執行單詞計算程式時,進行讀取、分詞和按詞分組所花的時間不到1秒。通過擴充日志記錄可看到以下内容:
spark打開幾個端口與執行器和使用者通信
可從本地或分布式存儲(hdfs、cassandra和s3)中讀取檔案
如果spark建構時支援hive,它會連接配接到hive上
spark使用惰性求值(僅當輸出請求時)來執行管道
spark使用内部排程器将作業拆分為任務,優化執行任務,然後執行它們
結果存儲在rdd中,可用集合方法來儲存或導入到執行shell的節點的ram中
并行性能調整的原則是在不同節點或線程之間分割工作負載,使得開銷相對較小,而且要保持負載平衡。
3.3.2 基于流的單詞計數
spark支援對輸入流進行監聽,能對其進行分區,并以接近實時的方式來計算聚合。目前支援來自kafka、flume、hdfs/s3、kinesis、twitter,以及傳統的mq(如zeromq和mqtt)的資料流。在spark中,流的傳輸是以小批量(micro-batch)方式進行的。在spark内部會将輸入資料分成小批量,通常按大小的不同,有些所花的時間不到1秒,有些卻要幾分鐘,然後會對這些小批量資料執行rdd聚合操作。
下面擴充前面介紹的flume示例。這需要修改flume配置檔案來建立一個spark輪詢槽(polling sink),用這種槽來替代hdfs:
現在不用寫入hdfs,flume将會等待spark的輪詢資料:
為了運作程式,在一個視窗中啟動flume代理:
然後在另一個視窗運作flumewordcount對象:
現在任何輸入到netcat連接配接的文本都将被分詞并在6秒的滑動視窗上按每2秒計算單詞的量:
spark/scala允許在不同的流之間無縫切換。例如,kafka釋出/訂閱主題模型類似于如下形式:
要啟動kafka代理,首先下載下傳最新釋出的二進制包并啟動zookeeper。zookeeper是一個分布式服務協調器,即使kafka部署在單節點上也需要它:
在另一個視窗中啟動kafka伺服器:
運作kafkawordcount對象:
現在将單詞流釋出到kafka主題中,這需要再開啟一個計數視窗:
從上面的結果可以看出程式每兩秒輸出一次。spark流有時被稱為小批次處理(micro-batch processing)。資料流有許多其他應用程式(和架構),但要完全讨論清楚會涉及很多内容,是以需要單獨進行介紹。在第5章會讨論一些資料流上的機器學習問題。下面将介紹更傳統的類sql接口。
3.3.3 spark sql和資料框
資料框(data frame)相對較新,在spark的1.3版本中才引入,它允許人們使用标準的sql語言來分析資料。在第1章就使用了一些sql指令來進行資料分析。sql對于簡單的資料分析和聚合非常有用。
最新的調查結果表明大約有70%的spark使用者使用dataframe。雖然dataframe最近成為表格資料最流行的工作架構,但它是一個相對重量級的對象。dataframe使用的管道在執行速度上可能比基于scala的vector或labeledpoint(這兩個對象将在下一章讨論)的速度慢得多。來自多名開發人員的證據表明:響應時間可為幾十或幾百毫秒,這與具體查詢有關,若是更簡單的對象會小于1毫秒。
spark為sql實作了自己的shell,這是除标準scala repl shell以外的另一個shell。可通過./bin/spark-sql來運作該shell,還可通過這種shell來通路hive/impala或關系資料庫表:
在标準spark的repl中,可以通過運作相同的查詢來執行以下指令: