天天看點

Hadoop權威指南讀書筆記(二)—— MapReduce初了解

一、本章概覽

MapReduce可以看作是Hadoop中的分布式計算架構,是用于批量資料離線處理的程式設計模型。基于MapReduce的并行資料處理是Hadoop能夠支撐大資料計算的核心。

書中這一章是以一個實際的例子對MapReduce的過程、機制還有Hadoop提供的相關程式設計模型及借口做了簡單的介紹,内容即非常易懂,也能讓讀者初步地宏觀了解MapReduce的計算原理。其中很多細節的地方書中并沒有做詳細介紹,比如是如何做資料的備份容災,叢集中機器間資料傳輸的機制,以及partition、shuffle的具體實作等等。這部分可以後續放在進行理論上的深入了解,或者說準備在後面的源碼閱讀部分去進一步學習。

二、核心内容

這裡先把本章的重點内容梳理一遍,下一片部落格會介紹下Hadoop環境的配置、基于IDEA的本地開發環境搭建以及氣象分析的實際例子+代碼。本章開頭介紹了Unix腳本進行檔案處理,以及利用awk工具來進行資料處理和計算,這裡主要的目的是想表達單一程序的程式處理大量的資料存在效率的問題,如果進行多線程并行處理,在沒有機制的限制下,又回出現很多問題:

  1. 比如如何進行合理的檔案劃分。天氣的資料以年為機關存放在多個檔案中,不同年份的資料量大小不一,如何保證每個作業(線程)處理的資料量大小是相同的呢?(因為如果配置設定的大小不一,那麼總的計算時間仍然取決于執行最長檔案的處理時間)
  2. 在實作了第一點的基礎上,得到了每個作業的中間計算結果。由于需要進行盡量均等的檔案劃分,那麼就不能保證每個作業都能完整處理一年的資料。那麼要得到我們想要的結果,即每年的最高氣溫,那麼還需要對結果進行合并處理的過程。
  3. 即便能解決以上問題,基于多線程的計算面對大規模的資料時,計算力仍然是遠遠不夠的。(單台機器的計算力上限擺在那裡)

是以,我們需要Hadoop這樣的可以在分布式叢集中進行大規模資料處理的架構。MapReduce可以粗分為Map和Reduce兩個階段,在每個階段都以鍵值對(k-v)的形式處理輸入資料,輸出資料。多個Map任務接收輸入資料之後,按照定義的Map函數對資料進行處理,得到的中間結果會在叢集中通過網絡配置設定到對應執行Reduce任務的機器上,繼續進行Reduce任務。每個Reduce任務得到的結果(最終結果)會通過Hdfs持久化到機器的硬碟上。那麼,輸入資料的形式是什麼,Map任務是如何建立的?又是如何配置設定到對應的Reducer的呢?

MapReduce的詳細過程可以分為這樣七步:(初步的了解)

  1. Input:這一步對(按block分塊)存儲在Hdfs上的資料進行讀取并生成鍵值對,具體的資料類型,Hadoop這裡進行了單獨的封裝,并沒有使用java原生的資料類型。這裡的鍵可以是檔案名,可以是資料的行偏移量等等。
  2. Map:Map函數接受鍵值對形式的輸入,并進行資料處理。比如在氣溫分析的例子中,進行了年份和溫度的提取。Map輸出的中間結果會被緩存。
  3. Partition:如果對應有多個Reduce任務,那麼資料需要進行分區,分區會被配置設定到對應的Reducer執行。每個分區可能包括多個鍵對應的資料。分區後的結果會周期性地寫入機器的本地硬碟。
  4. Shuffle:在這一階段,不同分區的資料被配置設定到對應執行Reduce任務的worker上(Reducer從對應Mapper的機器上進行資料讀取)
  5. Sort:Reducer讀取完所有的中間資料後,通過key進行sort,使相同key的資料聚合在一起(不同key的資料可能會被配置設定的同一個Reducer)。
  6. Reduce:按照定義的Reduce函數對資料進行進一步處理。
  7. Output:Reduce函數的輸出被追加到所屬分區的輸出檔案上(Hdfs)。第一個副本會存儲在本地節點,其他副本處于可靠性考慮,分發到不同機架上的節點。

這其中,隻有第1、2、7步是必要存在的,一個MapReduce資料流的梳理可以隻有Map任務。上面過程中的部分資料傳輸是非本地的,比如Mapper–》Reducer,即shuffle過程,還有輸入、輸出的過程(有從Hdfs其他節點上讀取存儲的資料進行MapReduce的情況)。

Hadoop權威指南讀書筆記(二)—— MapReduce初了解

這整個過程下來,有幾個難點,包括Map任務的數量、輸入資料如何分片、節點挂掉(任務失敗)、資料跨節點傳輸需要占用大量網絡帶寬等等,我們看下Hadoop是如何解決的。

  • Map 任務的數量:Map任務的數量由存儲在Hdfs上資料塊的數量決定,一個資料塊對應mapreduce中的一個split,Hadoop會為每一個split建立一個map task。
  • 輸入資料如何分片:将資料進行分片,并且保持分片的資料量大小較小,是為了能夠更好地進行分布式的處理,實作負載平衡(算力更好的機器可以處理更多的分片)。這裡其實有兩個問題,即合理的分片大小和如何在建立Map任務時盡量避免資料的非本地傳輸。(1)Hdfs上的資料按照block size被分為等長的塊,預設的大小是128m。MapReduce的split大小一般任務跟block大小保持一緻為佳。理由是如果split_size > block_size,那麼每個分片會跨越多個資料塊,執行map任務的worker需要從多個節點去讀取資料。如果split_size < block_size,那麼同樣的道理,一個block會需要被多個mapper讀取資料,占用帶寬資源。
  • 節點挂掉(任務失敗):分布式叢集比較容易出現的就是某些節點的故障。如果出現節點挂掉的情況,該任務會被置為出事的空閑狀态,Hadoop會在另一個節點上重新運作該任務,并通知所有的Reducer從新的節點讀取資料(如果是Map任務)。對于叢集中master節點挂掉的情況,Hadoop也設計了一套機制來保障容災,基本原理是進行周期性地進行資料持久化,不在這裡進行詳細介紹。至于如何發現故障節點,這個在以後進行介紹。
  • 資料跨節點傳輸需要占用大量網絡帶寬:這個問題跟第二個問題也有關系。Hadoop會優先進行資料本地化優化,即存儲該資料塊的節點,就在本地建立Map任務,如果該節點正在運作其他Map任務,則會需要從同一機架上尋找空閑的節點。極少的情況下,會出現跨機架的建立Map任務。這樣可以盡量保證減少帶寬占用。這裡的本地化優化隻針對Map任務,不适用于Reduce任務(Reducer始終都需要從非本地的Mapper去讀取資料----一個Reducer幾乎要從所有的Mapper上去讀取資料)。

本章還提到了combiner,在我了解這屬于一種部分情況下可行的節約Mapper–》Reducer的網絡帶寬資源的一種機制。這裡舉個例子說明:在氣溫分析的場景中,我們在Map函數中提取年份和氣溫的資訊,然後shuffle到Reduce端,對每一個年份(鍵)進行Max操作求得最高氣溫。那麼我們為什麼不在Mapper端就先預進行一次max操作,得到一個中間結果呢。

即,每個map task上都包括數個年份的資料,如果我們在這裡通過combiner先調用一次reduce函數,那麼一個map task輸出的多個鍵對應的值就隻會有一個,而不是一個集合。這樣從Map端傳輸到Reduce端的資料量就會小很多。但是結合實際情況一想,在大多數場景下,可能都不滿足max操作這樣的可結合性。這裡需要結合具體場景進行優化,并不是通用的。

繼續閱讀