我們通過下面這個天氣資料處理的例子來說明Hadoop的運作原理.
1、Map-Reduce的邏輯過程
假設我們需要處理一批有關天氣的資料,其格式如下:
- 按照ASCII碼存儲,每行一條記錄
- 每一行字元從0開始計數,第15個到第18個字元為年
- 第25個到第29個字元為溫度,其中第25位是符号+/-
0067011990999991950051507+0000+ 0043011990999991950051512+0022+ 0043011990999991950051518-0011+ 0043012650999991949032412+0111+ 0043012650999991949032418+0078+ 0067011990999991937051507+0001+ 0043011990999991937051512-0002+ 0043011990999991945051518+0001+ 0043012650999991945032412+0002+ 0043012650999991945032418+0078+ |
現在需要統計出每年的最高溫度。
Map-Reduce主要包括兩個步驟:Map和Reduce
每一步都有key-value對作為輸入和輸出:
- map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的TextInputFormat,則每行作為一個記錄程序處理,其中key為此行的開頭相對于檔案的起始位置,value就是此行的字元文本
- map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應
對于上面的例子,在map過程,輸入的key-value對如下:
(0, 0067011990999991950051507+0000+) (33, 0043011990999991950051512+0022+) (66, 0043011990999991950051518-0011+) (99, 0043012650999991949032412+0111+) (132, 0043012650999991949032418+0078+) (165, 0067011990999991937051507+0001+) (198, 0043011990999991937051512-0002+) (231, 0043011990999991945051518+0001+) (264, 0043012650999991945032412+0002+) (297, 0043012650999991945032418+0078+) |
在map過程中,通過對每一行字元串的解析,得到年-溫度的key-value對作為輸出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) |
在reduce過程,将map過程中的輸出,按照相同的key将value放到同一個清單中作為reduce的輸入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) |
在reduce過程中,在清單中選擇出最大的溫度,将年-最大溫度的key-value作為輸出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78) |
其邏輯過程可用如下圖表示:
下圖大概描述了Map-Reduce的Job運作的基本原理:
下面我們讨論JobConf,其有很多的項可以進行配置:
- setInputFormat:設定map的輸入格式,預設為TextInputFormat,key為LongWritable, value為Text
- setNumMapTasks:設定map任務的個數,此設定通常不起作用,map任務的個數取決于輸入的資料所能分成的input split的個數
- setMapperClass:設定Mapper,預設為IdentityMapper
- setMapRunnerClass:設定MapRunner, map task是由MapRunner運作的,預設為MapRunnable,其功能為讀取input split的一個個record,依次調用Mapper的map函數
- setMapOutputKeyClass和setMapOutputValueClass:設定Mapper的輸出的key-value對的格式
- setOutputKeyClass和setOutputValueClass:設定Reducer的輸出的key-value對的格式
- setPartitionerClass和setNumReduceTasks:設定Partitioner,預設為HashPartitioner,其根據key的hash值來決定進入哪個partition,每個partition被一個reduce task處理,是以partition的個數等于reduce task的個數
- setReducerClass:設定Reducer,預設為IdentityReducer
- setOutputFormat:設定任務的輸出格式,預設為TextOutputFormat
- FileInputFormat.addInputPath:設定輸入檔案的路徑,可以使一個檔案,一個路徑,一個通配符。可以被調用多次添加多個路徑
- FileOutputFormat.setOutputPath:設定輸出檔案的路徑,在job運作前此路徑不應該存在
當然不用所有的都設定,由上面的例子,可以編寫Map-Reduce程式如下:
|
3、Map-Reduce資料流(data flow)
Map-Reduce的處理過程主要涉及以下四個部分:
- 用戶端Client:用于送出Map-reduce任務job
- JobTracker:協調整個job的運作,其為一個Java程序,其main class為JobTracker
- TaskTracker:運作此job的task,處理input split,其為一個Java程序,其main class為TaskTracker
- HDFS:hadoop分布式檔案系統,用于在各個程序間共享Job相關的檔案
3.1、任務送出
JobClient.runJob()建立一個新的JobClient執行個體,調用其submitJob()函數。
- 向JobTracker請求一個新的job ID
- 檢測此job的output配置
- 計算此job的input splits
- 将Job運作所需的資源拷貝到JobTracker的檔案系統中的檔案夾中,包括job jar檔案,job.xml配置檔案,input splits
- 通知JobTracker此Job已經可以運作了
送出任務後,runJob每隔一秒鐘輪詢一次job的進度,将進度傳回到指令行,直到任務運作完畢。
3.2、任務初始化
當JobTracker收到submitJob調用的時候,将此任務放到一個隊列中,job排程器将從隊列中擷取任務并初始化任務。
初始化首先建立一個對象來封裝job運作的tasks, status以及progress。
在建立task之前,job排程器首先從共享檔案系統中獲得JobClient計算出的input splits。
其為每個input split建立一個map task。
每個task被配置設定一個ID。
3.3、任務配置設定
TaskTracker周期性的向JobTracker發送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已經準備運作一個新的task,JobTracker将配置設定給其一個task。
在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。
TaskTracker有固定數量的位置來運作map task或者reduce task。
預設的排程器對待map task優先于reduce task
當選擇reduce task的時候,JobTracker并不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有資料本地化的概念。
3.4、任務執行
TaskTracker被配置設定了一個task,下面便要運作此task。
首先,TaskTracker将此job的jar從共享檔案系統中拷貝到TaskTracker的檔案系統中。
TaskTracker從distributed cache中将job運作所需要的檔案拷貝到本地磁盤。
其次,其為每個task建立一個本地的工作目錄,将jar解壓縮到檔案目錄中。
其三,其建立一個TaskRunner來運作task。
TaskRunner建立一個新的JVM來運作task。
被建立的child JVM和TaskTracker通信來報告運作進度。
3.4.1、Map的過程
MapRunnable從input split中讀取一個個的record,然後依次調用Mapper的map函數,将結果輸出。
map的輸出并不是直接寫入硬碟,而是将其寫入緩存memory buffer。
當buffer中資料的到達一定的大小,一個背景線程将資料開始寫入硬碟。
在寫入硬碟之前,記憶體中的資料通過partitioner分成多個partition。
在同一個partition中,背景線程會将資料按照key在記憶體中排序。
每次從記憶體向硬碟flush資料,都生成一個新的spill檔案。
當此task結束之前,所有的spill檔案被合并為一個整的被partition的而且排好序的檔案。
reducer可以通過http協定請求map的輸出檔案,tracker.http.threads可以設定http服務線程數。
3.4.2、Reduce的過程
當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。
對于一個job,JobTracker知道TaskTracer和map輸出的對應關系。
reducer中一個線程周期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。
reduce task需要其對應的partition的所有的map輸出。
reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。
reduce task中有多個copy線程,可以并行拷貝map輸出。
當很多map輸出拷貝到reduce task後,一個背景線程将其合并為一個大的排好序的檔案。
當所有的map輸出都拷貝到reduce task後,進入sort過程,将所有的map輸出合并為大的排好序的檔案。
最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每個key,最後的結果寫入HDFS。