mapreduce運作的時候,會通過mapper運作的任務讀取hdfs中的資料檔案,然後調用自己的方法,處理資料,最後輸出。reducer任務會接收mapper任務輸出的資料,作為自己的輸入資料,調用自己的方法,最後輸出到hdfs的檔案中。整個流程如圖:
每個mapper任務是一個java程序,它會讀取hdfs中的檔案,解析成很多的鍵值對,經過我們覆寫的map方法處理後,轉換為很多的鍵值對再輸出。整個mapper任務的處理過程又可以分為以下幾個階段,如圖所示。
在上圖中,把mapper任務的運作過程分為六個階段。
第一階段是把輸入檔案按照一定的标準分片(inputsplit),每個輸入片的大小是固定的。預設情況下,輸入片(inputsplit)的大小與資料塊(block)的大小是相同的。如果資料塊(block)的大小是預設值64mb,輸入檔案有兩個,一個是32mb,一個是72mb。那麼小的檔案是一個輸入片,大檔案會分為兩個資料塊,那麼是兩個輸入片。一共産生三個輸入片。每一個輸入片由一個mapper程序處理。這裡的三個輸入片,會有三個mapper程序處理。
第二階段是對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文本内容解析成鍵值對。“鍵”是每一行的起始位置(機關是位元組),“值”是本行的文本内容。
第三階段是調用mapper類中的map方法。第二階段中解析出來的每一個鍵值對,調用一次map方法。如果有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。
第四階段是按照一定的規則對第三階段輸出的鍵值對進行分區。比較是基于鍵進行的。比如我們的鍵表示省份(如北京、上海、山東等),那麼就可以按照不同省份進行分區,同一個省份的鍵值對劃分到一個區中。預設是隻有一個區。分區的數量就是reducer任務運作的數量。預設隻有一個reducer任務。
第五階段是對每個分區中的鍵值對進行排序。首先,按照鍵進行排序,對于鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分别是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到本地的linux檔案中。
第六階段是對資料進行歸約處理,也就是reduce處理。鍵相等的鍵值對會調用一次reduce方法。經過這一階段,資料量會減少。歸約後的資料輸出到本地的linxu檔案中。本階段預設是沒有的,需要使用者自己增加這一階段的代碼。
每個reducer任務是一個java程序。reducer任務接收mapper任務的輸出,歸約處理後寫入到hdfs中,可以分為如下圖所示的幾個階段。
第一階段是reducer任務會主動從mapper任務複制其輸出的鍵值對。mapper任務可能會有很多,是以reducer會複制多個mapper的輸出。
第二階段是把複制到reducer本地資料,全部進行合并,即把分散的資料合并成一個大的資料。再對合并後的資料排序。
第三階段是對排序後的鍵值對調用reduce方法。鍵相等的鍵值對調用一次reduce方法,每次調用會産生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到hdfs檔案中。
在整個mapreduce程式的開發過程中,我們最大的工作量是覆寫map函數和覆寫reduce函數。
在對mapper任務、reducer任務的分析過程中,會看到很多階段都出現了鍵值對,讀者容易混淆,是以這裡對鍵值對進行編号,友善大家了解鍵值對的變化情況,如下圖所示。
在上圖中,對于mapper任務輸入的鍵值對,定義為key1和value1。在map方法中處理後,輸出的鍵值對,定義為key2和value2。reduce方法接收key2和value2,處理後,輸出key3和value3。在下文讨論鍵值對時,可能把key1和value1簡寫為<k1,v1>,key2和value2簡寫為<k2,v2>,key3和value3簡寫為<k3,v3>。
-----------------------分------------------割----------------線-------------------------
在hdfs中的根目錄下有以下檔案格式: /input.txt
<a href="http://my.oschina.net/itblog/blog/275294#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<code>2014010114</code>
<code>2014010216</code>
<code>2014010317</code>
<code>2014010410</code>
<code>2014010506</code>
<code>2012010609</code>
<code>2012010732</code>
<code>2012010812</code>
<code>2012010919</code>
<code>2012011023</code>
<code>2001010116</code>
<code>2001010212</code>
<code>2001010310</code>
<code>2001010411</code>
<code>2001010529</code>
<code>2013010619</code>
<code>2013010722</code>
<code>2013010812</code>
<code>2013010929</code>
<code>2013011023</code>
<code>2008010105</code>
<code>2008010216</code>
<code>2008010337</code>
<code>2008010414</code>
<code>2008010516</code>
<code>2007010619</code>
<code>2007010712</code>
<code>2007010812</code>
<code>2007010999</code>
<code>2007011023</code>
<code>2010010114</code>
<code>2010010216</code>
<code>2010010317</code>
<code>2010010410</code>
<code>2010010506</code>
<code>2015010649</code>
<code>2015010722</code>
<code>2015010812</code>
<code>2015010999</code>
<code>2015011023</code>
比如:2010012325表示在2010年01月23日的氣溫為25度。現在要求使用mapreduce,計算每一年出現過的最大氣溫。
此程式需要以hadoop檔案作為輸入檔案,以hadoop檔案作為輸出檔案,是以需要用到檔案系統,于是需要引入hadoop-hdfs包;我們需要向map-reduce叢集送出任務,需要用到map-reduce的用戶端,于是需要導入hadoop-mapreduce-client-jobclient包;另外,在處理資料的時候會用到一些hadoop的資料類型例如intwritable和text等,是以需要導入hadoop-common包。于是運作此程式所需要的相關依賴有以下幾個:
<code><</code><code>dependency</code><code>></code>
<code> </code><code><</code><code>groupid</code><code>>org.apache.hadoop</</code><code>groupid</code><code>></code>
<code> </code><code><</code><code>artifactid</code><code>>hadoop-hdfs</</code><code>artifactid</code><code>></code>
<code> </code><code><</code><code>version</code><code>>2.4.0</</code><code>version</code><code>></code>
<code></</code><code>dependency</code><code>></code>
<code> </code><code><</code><code>artifactid</code><code>>hadoop-mapreduce-client-jobclient</</code><code>artifactid</code><code>></code>
<code> </code><code><</code><code>artifactid</code><code>>hadoop-common</</code><code>artifactid</code><code>></code>
包導好了後, 設計代碼如下:
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<code>package</code> <code>com.abc.yarn;</code>
<code>import</code> <code>java.io.ioexception;</code>
<code>import</code> <code>org.apache.hadoop.conf.configuration;</code>
<code>import</code> <code>org.apache.hadoop.fs.path;</code>
<code>import</code> <code>org.apache.hadoop.io.intwritable;</code>
<code>import</code> <code>org.apache.hadoop.io.longwritable;</code>
<code>import</code> <code>org.apache.hadoop.io.text;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.job;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.mapper;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.reducer;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.fileinputformat;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.fileoutputformat;</code>
<code>public</code> <code>class</code> <code>temperature {</code>
<code> </code><code>/**</code>
<code> </code><code>* 四個泛型類型分别代表:</code>
<code> </code><code>* keyin mapper的輸入資料的key,這裡是每行文字的起始位置(0,11,...)</code>
<code> </code><code>* valuein mapper的輸入資料的value,這裡是每行文字</code>
<code> </code><code>* keyout mapper的輸出資料的key,這裡是每行文字中的“年份”</code>
<code> </code><code>* valueout mapper的輸出資料的value,這裡是每行文字中的“氣溫”</code>
<code> </code><code>*/</code>
<code> </code><code>static</code> <code>class</code> <code>tempmapper </code><code>extends</code>
<code> </code><code>mapper<longwritable, text, text, intwritable> {</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>map(longwritable key, text value, context context)</code>
<code> </code><code>throws</code> <code>ioexception, interruptedexception {</code>
<code> </code><code>// 列印樣本: before mapper: 0, 2000010115</code>
<code> </code><code>system.out.print(</code><code>"before mapper: "</code> <code>+ key + </code><code>", "</code> <code>+ value);</code>
<code> </code><code>string line = value.tostring();</code>
<code> </code><code>string year = line.substring(</code><code>0</code><code>, </code><code>4</code><code>);</code>
<code> </code><code>int</code> <code>temperature = integer.parseint(line.substring(</code><code>8</code><code>));</code>
<code> </code><code>context.write(</code><code>new</code> <code>text(year), </code><code>new</code> <code>intwritable(temperature));</code>
<code> </code><code>// 列印樣本: after mapper:2000, 15</code>
<code> </code><code>system.out.println(</code>
<code> </code><code>"======"</code> <code>+</code>
<code> </code><code>"after mapper:"</code> <code>+ </code><code>new</code> <code>text(year) + </code><code>", "</code> <code>+ </code><code>new</code> <code>intwritable(temperature));</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>* keyin reducer的輸入資料的key,這裡是每行文字中的“年份”</code>
<code> </code><code>* valuein reducer的輸入資料的value,這裡是每行文字中的“氣溫”</code>
<code> </code><code>* keyout reducer的輸出資料的key,這裡是不重複的“年份”</code>
<code> </code><code>* valueout reducer的輸出資料的value,這裡是這一年中的“最高氣溫”</code>
<code> </code><code>static</code> <code>class</code> <code>tempreducer </code><code>extends</code>
<code> </code><code>reducer<text, intwritable, text, intwritable> {</code>
<code> </code><code>public</code> <code>void</code> <code>reduce(text key, iterable<intwritable> values,</code>
<code> </code><code>context context) </code><code>throws</code> <code>ioexception, interruptedexception {</code>
<code> </code><code>int</code> <code>maxvalue = integer.min_value;</code>
<code> </code><code>stringbuffer sb = </code><code>new</code> <code>stringbuffer();</code>
<code> </code><code>//取values的最大值</code>
<code> </code><code>for</code> <code>(intwritable value : values) {</code>
<code> </code><code>maxvalue = math.max(maxvalue, value.get());</code>
<code> </code><code>sb.append(value).append(</code><code>", "</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>// 列印樣本: before reduce: 2000, 15, 23, 99, 12, 22, </code>
<code> </code><code>system.out.print(</code><code>"before reduce: "</code> <code>+ key + </code><code>", "</code> <code>+ sb.tostring());</code>
<code> </code><code>context.write(key, </code><code>new</code> <code>intwritable(maxvalue));</code>
<code> </code><code>// 列印樣本: after reduce: 2000, 99</code>
<code> </code><code>"after reduce: "</code> <code>+ key + </code><code>", "</code> <code>+ maxvalue);</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
<code> </code><code>//輸入路徑</code>
<code> </code><code>string dst = </code><code>"hdfs://localhost:9000/intput.txt"</code><code>;</code>
<code> </code><code>//輸出路徑,必須是不存在的,空檔案加也不行。</code>
<code> </code><code>string dstout = </code><code>"hdfs://localhost:9000/output"</code><code>;</code>
<code> </code><code>configuration hadoopconfig = </code><code>new</code> <code>configuration();</code>
<code> </code>
<code> </code><code>hadoopconfig.set(</code><code>"fs.hdfs.impl"</code><code>, </code>
<code> </code><code>org.apache.hadoop.hdfs.distributedfilesystem.</code><code>class</code><code>.getname()</code>
<code> </code><code>);</code>
<code> </code><code>hadoopconfig.set(</code><code>"fs.file.impl"</code><code>,</code>
<code> </code><code>org.apache.hadoop.fs.localfilesystem.</code><code>class</code><code>.getname()</code>
<code> </code><code>job job = </code><code>new</code> <code>job(hadoopconfig);</code>
<code> </code><code>//如果需要打成jar運作,需要下面這句</code>
<code> </code><code>//job.setjarbyclass(newmaxtemperature.class);</code>
<code> </code><code>//job執行作業時輸入和輸出檔案的路徑</code>
<code> </code><code>fileinputformat.addinputpath(job, </code><code>new</code> <code>path(dst));</code>
<code> </code><code>fileoutputformat.setoutputpath(job, </code><code>new</code> <code>path(dstout));</code>
<code> </code><code>//指定自定義的mapper和reducer作為兩個階段的任務處理類</code>
<code> </code><code>job.setmapperclass(tempmapper.</code><code>class</code><code>);</code>
<code> </code><code>job.setreducerclass(tempreducer.</code><code>class</code><code>);</code>
<code> </code><code>//設定最後輸出結果的key和value的類型</code>
<code> </code><code>job.setoutputkeyclass(text.</code><code>class</code><code>);</code>
<code> </code><code>job.setoutputvalueclass(intwritable.</code><code>class</code><code>);</code>
<code> </code><code>//執行job,直到完成</code>
<code> </code><code>job.waitforcompletion(</code><code>true</code><code>);</code>
<code> </code><code>system.out.println(</code><code>"finished"</code><code>);</code>
<code>}</code>
上面代碼中,注意mapper類的泛型不是java的基本類型,而是hadoop的資料類型text、intwritable。我們可以簡單的等價為java的類string、int。
代碼中mapper類的泛型依次是<k1,v1,k2,v2>。map方法的第二個形參是行文本内容,是我們關心的。核心代碼是把行文本内容按照空格拆分,把每行資料中“年”和“氣溫”提取出來,其中“年”作為新的鍵,“溫度”作為新的值,寫入到上下文context中。在這裡,因為每一年有多行資料,是以每一行都會輸出一個<年份, 氣溫>鍵值對。
下面是控制台列印結果:
<code>before mapper: </code><code>0</code><code>, </code><code>2014010114</code><code>======after mapper:</code><code>2014</code><code>, </code><code>14</code>
<code>before mapper: </code><code>11</code><code>, </code><code>2014010216</code><code>======after mapper:</code><code>2014</code><code>, </code><code>16</code>
<code>before mapper: </code><code>22</code><code>, </code><code>2014010317</code><code>======after mapper:</code><code>2014</code><code>, </code><code>17</code>
<code>before mapper: </code><code>33</code><code>, </code><code>2014010410</code><code>======after mapper:</code><code>2014</code><code>, </code><code>10</code>
<code>before mapper: </code><code>44</code><code>, </code><code>2014010506</code><code>======after mapper:</code><code>2014</code><code>, </code><code>6</code>
<code>before mapper: </code><code>55</code><code>, </code><code>2012010609</code><code>======after mapper:</code><code>2012</code><code>, </code><code>9</code>
<code>before mapper: </code><code>66</code><code>, </code><code>2012010732</code><code>======after mapper:</code><code>2012</code><code>, </code><code>32</code>
<code>before mapper: </code><code>77</code><code>, </code><code>2012010812</code><code>======after mapper:</code><code>2012</code><code>, </code><code>12</code>
<code>before mapper: </code><code>88</code><code>, </code><code>2012010919</code><code>======after mapper:</code><code>2012</code><code>, </code><code>19</code>
<code>before mapper: </code><code>99</code><code>, </code><code>2012011023</code><code>======after mapper:</code><code>2012</code><code>, </code><code>23</code>
<code>before mapper: </code><code>110</code><code>, </code><code>2001010116</code><code>======after mapper:</code><code>2001</code><code>, </code><code>16</code>
<code>before mapper: </code><code>121</code><code>, </code><code>2001010212</code><code>======after mapper:</code><code>2001</code><code>, </code><code>12</code>
<code>before mapper: </code><code>132</code><code>, </code><code>2001010310</code><code>======after mapper:</code><code>2001</code><code>, </code><code>10</code>
<code>before mapper: </code><code>143</code><code>, </code><code>2001010411</code><code>======after mapper:</code><code>2001</code><code>, </code><code>11</code>
<code>before mapper: </code><code>154</code><code>, </code><code>2001010529</code><code>======after mapper:</code><code>2001</code><code>, </code><code>29</code>
<code>before mapper: </code><code>165</code><code>, </code><code>2013010619</code><code>======after mapper:</code><code>2013</code><code>, </code><code>19</code>
<code>before mapper: </code><code>176</code><code>, </code><code>2013010722</code><code>======after mapper:</code><code>2013</code><code>, </code><code>22</code>
<code>before mapper: </code><code>187</code><code>, </code><code>2013010812</code><code>======after mapper:</code><code>2013</code><code>, </code><code>12</code>
<code>before mapper: </code><code>198</code><code>, </code><code>2013010929</code><code>======after mapper:</code><code>2013</code><code>, </code><code>29</code>
<code>before mapper: </code><code>209</code><code>, </code><code>2013011023</code><code>======after mapper:</code><code>2013</code><code>, </code><code>23</code>
<code>before mapper: </code><code>220</code><code>, </code><code>2008010105</code><code>======after mapper:</code><code>2008</code><code>, </code><code>5</code>
<code>before mapper: </code><code>231</code><code>, </code><code>2008010216</code><code>======after mapper:</code><code>2008</code><code>, </code><code>16</code>
<code>before mapper: </code><code>242</code><code>, </code><code>2008010337</code><code>======after mapper:</code><code>2008</code><code>, </code><code>37</code>
<code>before mapper: </code><code>253</code><code>, </code><code>2008010414</code><code>======after mapper:</code><code>2008</code><code>, </code><code>14</code>
<code>before mapper: </code><code>264</code><code>, </code><code>2008010516</code><code>======after mapper:</code><code>2008</code><code>, </code><code>16</code>
<code>before mapper: </code><code>275</code><code>, </code><code>2007010619</code><code>======after mapper:</code><code>2007</code><code>, </code><code>19</code>
<code>before mapper: </code><code>286</code><code>, </code><code>2007010712</code><code>======after mapper:</code><code>2007</code><code>, </code><code>12</code>
<code>before mapper: </code><code>297</code><code>, </code><code>2007010812</code><code>======after mapper:</code><code>2007</code><code>, </code><code>12</code>
<code>before mapper: </code><code>308</code><code>, </code><code>2007010999</code><code>======after mapper:</code><code>2007</code><code>, </code><code>99</code>
<code>before mapper: </code><code>319</code><code>, </code><code>2007011023</code><code>======after mapper:</code><code>2007</code><code>, </code><code>23</code>
<code>before mapper: </code><code>330</code><code>, </code><code>2010010114</code><code>======after mapper:</code><code>2010</code><code>, </code><code>14</code>
<code>before mapper: </code><code>341</code><code>, </code><code>2010010216</code><code>======after mapper:</code><code>2010</code><code>, </code><code>16</code>
<code>before mapper: </code><code>352</code><code>, </code><code>2010010317</code><code>======after mapper:</code><code>2010</code><code>, </code><code>17</code>
<code>before mapper: </code><code>363</code><code>, </code><code>2010010410</code><code>======after mapper:</code><code>2010</code><code>, </code><code>10</code>
<code>before mapper: </code><code>374</code><code>, </code><code>2010010506</code><code>======after mapper:</code><code>2010</code><code>, </code><code>6</code>
<code>before mapper: </code><code>385</code><code>, </code><code>2015010649</code><code>======after mapper:</code><code>2015</code><code>, </code><code>49</code>
<code>before mapper: </code><code>396</code><code>, </code><code>2015010722</code><code>======after mapper:</code><code>2015</code><code>, </code><code>22</code>
<code>before mapper: </code><code>407</code><code>, </code><code>2015010812</code><code>======after mapper:</code><code>2015</code><code>, </code><code>12</code>
<code>before mapper: </code><code>418</code><code>, </code><code>2015010999</code><code>======after mapper:</code><code>2015</code><code>, </code><code>99</code>
<code>before mapper: </code><code>429</code><code>, </code><code>2015011023</code><code>======after mapper:</code><code>2015</code><code>, </code><code>23</code>
<code>before reduce: </code><code>2001</code><code>, </code><code>12</code><code>, </code><code>10</code><code>, </code><code>11</code><code>, </code><code>29</code><code>, </code><code>16</code><code>, ======after reduce: </code><code>2001</code><code>, </code><code>29</code>
<code>before reduce: </code><code>2007</code><code>, </code><code>23</code><code>, </code><code>19</code><code>, </code><code>12</code><code>, </code><code>12</code><code>, </code><code>99</code><code>, ======after reduce: </code><code>2007</code><code>, </code><code>99</code>
<code>before reduce: </code><code>2008</code><code>, </code><code>16</code><code>, </code><code>14</code><code>, </code><code>37</code><code>, </code><code>16</code><code>, </code><code>5</code><code>, ======after reduce: </code><code>2008</code><code>, </code><code>37</code>
<code>before reduce: </code><code>2010</code><code>, </code><code>10</code><code>, </code><code>6</code><code>, </code><code>14</code><code>, </code><code>16</code><code>, </code><code>17</code><code>, ======after reduce: </code><code>2010</code><code>, </code><code>17</code>
<code>before reduce: </code><code>2012</code><code>, </code><code>19</code><code>, </code><code>12</code><code>, </code><code>32</code><code>, </code><code>9</code><code>, </code><code>23</code><code>, ======after reduce: </code><code>2012</code><code>, </code><code>32</code>
<code>before reduce: </code><code>2013</code><code>, </code><code>23</code><code>, </code><code>29</code><code>, </code><code>12</code><code>, </code><code>22</code><code>, </code><code>19</code><code>, ======after reduce: </code><code>2013</code><code>, </code><code>29</code>
<code>before reduce: </code><code>2014</code><code>, </code><code>14</code><code>, </code><code>6</code><code>, </code><code>10</code><code>, </code><code>17</code><code>, </code><code>16</code><code>, ======after reduce: </code><code>2014</code><code>, </code><code>17</code>
<code>before reduce: </code><code>2015</code><code>, </code><code>23</code><code>, </code><code>49</code><code>, </code><code>22</code><code>, </code><code>12</code><code>, </code><code>99</code><code>, ======after reduce: </code><code>2015</code><code>, </code><code>99</code>
<code>finished</code>
執行結果:
從列印的日志中可以看出:
mapper的輸入資料(k1,v1)格式是:預設的按行分的鍵值對<0, 2010012325>,<11, 2012010123>...
reducer的輸入資料格式是:把相同的鍵合并後的鍵值對:<2001, [12, 32, 25...]>,<2007, [20, 34, 30...]>...
reducer的輸出數(k3,v3)據格式是:經自己在reducer中寫出的格式:<2001, 32>,<2007, 34>...
其中,由于輸入資料太小,map過程的第1階段這裡不能證明。但事實上是這樣的。
結論中第一點驗證了map過程的第2階段:“鍵”是每一行的起始位置(機關是位元組),“值”是本行的文本内容。
另外,通過reduce的幾行
可以證明map過程的第4階段:先分區,然後對每個分區都執行一次reduce(map過程第6階段)。
對于mapper的輸出,前文中提到:如果沒有reduce過程,mapper的輸出會直接寫入檔案。于是我們把reduce方法去掉(注釋掉第95行即可)。
再執行,下面是控制台列印結果:
再來看看執行結果:
結果還有很多行,沒有截圖了。
由于沒有執行reduce操作,是以這個就是mapper輸出的中間檔案的内容了。
從列印的日志可以看出:
mapper的輸出資料(k2, v2)格式是:經自己在mapper中寫出的格式:<2010, 25>,<2012, 23>...
從這個結果中可以看出,原資料檔案中的每一行确實都有一行輸出,那麼map過程的第3階段就證明了。
從這個結果中還可以看出,“年份”已經不是輸入給mapper的順序了,這也說明了在map過程中也按照key執行了排序操作,即map過程的第5階段。