文章目錄
- 本文行文邏輯
- MapReduce程式詳解(即map中,reduce中)
- map前,reduce後詳解
-
- 分片詳解
-
- 什麼是分片?
- 為何要分片?
- 分片大小計算?
- 分片的讀取規則
- 控制maptask和reducetask數量
- MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
-
- MR運作全流程中自定義部分
-
- 自定義資料類型
- 自定義分區
- Combine
- 分區詳解
本文行文邏輯
分别從MR過程的前,中,後三個階段對MR進行詳細介紹
- 1
MapReduce程式詳解(即map中,reduce中)
一種分布式運算程式,分為map階段和reduce階段
Map階段會有一個實體程式,不需要我們自己開發,使用者隻需要維護map方法就可以
預設情況下map程式讀取一行資料(映射成key-value對是一行行的)就會調用一次map方法,而且會将這一行資料的偏移量作為key(即key一定是IntWritable),這一行資料的内容作為value傳回給架構,然後由架構寫出context.write(key,value)
Reduce 階段會有一個實體程式,不要我們自己開發我們需要維護reduce方法
Reduce程式會接受map端輸出的中間結果資料,而且相同的key的資料會到達同一個reduce執行個體中去,每個reduce執行個體會處理多個key的資料。Reduce程式會将自己收集的資料按照key相同進行分組,對一組資料調用一次reduce方法(按組調用reduce方法),并且将參數傳給reduce(key,疊代器values,context),然後寫出
map前,reduce後詳解
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLxcmaNJzaq10MNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3cjM3QzMyYTMxITNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
分片詳解
什麼是分片?
在進行map計算之前,map會根據輸入檔案計算輸入分片(input split);每個輸入分片(input split)針對一個map任務,輸入分片存儲的并非是資料本身,而是一個分片長度和一個記錄資料的位置的數組。
邏輯概念,分片資訊包括起始偏移量,分片大小,分片資料所在的塊的資訊,塊所在的主機清單。
注意:分片是根據輸入檔案計算的,這些輸入檔案即是存儲在hdfs上的那些個檔案。
為何要分片?
答:友善多個Map任務并行處理,提高運作效率。
每一個分片對應着一個maptask,通過調整分片的大小可以調整maptask的數量,也就是調整map階段的并行度。
分片大小計算?
下面是源程式中對分片大小的計算:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//傳回1
long maxSize = getMaxSplitSize(job);//傳回long的最大值
long splitSize = computeSplitSize(blockSize, minSize, maxSize)
return Math.max(minSize, Math.min(maxSize, blockSize));
總計:先比較塊大小和最大分片,選出其中較小的,然後将結果和最小分片比較,選出較大的。
//計算分片大小,實際的分片大小。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一個1.1倍的備援
通過這個可以對實際分片大小進行設定,主要從最大分片大小和最小分片大小入手。
(1)FileInputFormat.setMinInputSplitSize(job,1000);
(2)FileInputFormat.setMaxInputSplitSize(job,1000000);
分片的讀取規則
- 第一個分片從第一行開始讀取,讀到分片末尾,再讀取下一個分片的第一行
- 既不是第一個分片也不是最後一個分片,第一行資料舍去,讀到分片末尾,再繼續讀 取下一個分片的第一行資料
- 最後一個分片舍去第一行,讀到分片末尾
控制maptask和reducetask數量
控制maptask數量:
1)maptask數量由分片數量決定,可設定maxsize,minsize,blocksize來控制分片的大小,進而控制分片數量
2)改變資料總量也可影響maptask數量
控制reducetask數量:
1)job.setNumReduceTasks(5); 直接設定reducetask數量
2)分區數和reducetask數量是一緻的,可以調整分區數。
MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
-
org.apache.hadoop.mapred.OutputCollector(即文中的輸出收集器),OutputCollector由Hadoop架構提供,負責收集Mapper和Reducer的輸出資料,實作map函數和reduce函數時,隻需要簡單的将其輸出的<key,value>對往OutputCollector中一丢即可,剩餘的事情架構自會幫你處理好。
可以了解為map()和reduce()不會每執行一次便寫出,是需要積累的,具體流程可以檢視底層原理進行了解。
-
為什麼要寫入環形緩沖區?
答: hadoop在執行MapReduce任務的時候,在map階段,map()函數産生輸出以後,并不是直接寫入到磁盤中,而是先寫入到了環形緩沖區,這樣做的原因是無論緩沖區寫入還是讀出,速度都更快,效率更高
-
寫入磁盤緩沖區的時候,為什麼隻寫入百分之80,那百分之20幹啥的?
答:這個記憶體緩沖區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過程被稱為Spill,中文可譯為溢寫,字面意思很直覺。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。
-
分片資料和溢寫磁盤檔案的對應關系?
通過上一條問題可知,環形緩沖區會重複利用。
一個分片資料有可能會産生多個溢寫磁盤檔案。
- 内/外部排序針對的是記憶體,在記憶體中的排序稱之為内部排序,反之,在磁盤等位置的排序稱為外部排序。
MR運作全流程中自定義部分
在MR運作全流程中,存在若幹過程是設計人員可以進行自定義設計,即人為幹預的。
包括:
- TextInputFormat
- map()函數
- 自定義分區(注意:先分區後排序,且是在溢寫之前進行分區)
- 分區内自定義排序(注意:要在分區内進行排序)
- combine()
- 傳入reduce()函數的key,我們可以自定義key類型,并定義什麼樣的key算是一樣的
- reduce( )
- TextOutputFormat
自定義資料類型
1.要實作writable接口
2.讀寫順序要一緻
3.構造方法如果進行了重寫,要顯示定義無參的構造方法
4.重寫toString方法
自定義分區
資料分發的政策
如何實作自定義分區?
前提要保證相同key的資料會發送到同一個reduce中,或者說是分到同一個分區中
Partitioner<key,value> ,這裡的key和value的資料類型和map輸出的資料類型保持一緻
繼承Patitioner這個類,重寫分區方法getPartition(map輸出key,map輸出value,分區數量)
然後在job中設定使用我們自定義的分區方法進行資料分發
/**
場景:将不同手機号字首歸向不同省份地區
*/
public class ProvincePartitioner extends Partitioner<Text,FlowBean>{
private static HashMap<String,Integer>pmap = new HashMap<>();
static{
pmap.put(“136”,0);
pmap.put(“137”,1);
pmap.put(“138”,2);
pmap.put(“139”,3);
}
@Override
public int getPartition(Text key,FlowBean flowBean,int numPartitions){
String prefix = key.toStrirg().substring(0,3); #截取手機号前三位
Integer partNum = pmap.get(prefix);
return(partNum==null?4:partNUm);
}
}
job.setPartitioner(ProvincePartitioner.class);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
Combine
combine出現的原因,作用?
減少資料量,提高傳輸效率。将形如A 1 A 1 A1 轉換成A 3,類似于map端的reduce。
注意:combine需要注意場合,不是什麼MR都适用。
分區詳解
分區的數量和reduce(或者叫reducetask)數量是一緻的。
文章目錄
- 本文行文邏輯
- MapReduce程式詳解(即map中,reduce中)
- map前,reduce後詳解
-
- 分片詳解
-
- 什麼是分片?
- 為何要分片?
- 分片大小計算?
- 分片的讀取規則
- 控制maptask和reducetask數量
- MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
-
- MR運作全流程中自定義部分
-
- 自定義資料類型
- 自定義分區
- Combine
- 分區詳解
本文行文邏輯
分别從MR過程的前,中,後三個階段對MR進行詳細介紹
- 1
MapReduce程式詳解(即map中,reduce中)
一種分布式運算程式,分為map階段和reduce階段
Map階段會有一個實體程式,不需要我們自己開發,使用者隻需要維護map方法就可以
預設情況下map程式讀取一行資料(映射成key-value對是一行行的)就會調用一次map方法,而且會将這一行資料的偏移量作為key(即key一定是IntWritable),這一行資料的内容作為value傳回給架構,然後由架構寫出context.write(key,value)
Reduce 階段會有一個實體程式,不要我們自己開發我們需要維護reduce方法
Reduce程式會接受map端輸出的中間結果資料,而且相同的key的資料會到達同一個reduce執行個體中去,每個reduce執行個體會處理多個key的資料。Reduce程式會将自己收集的資料按照key相同進行分組,對一組資料調用一次reduce方法(按組調用reduce方法),并且将參數傳給reduce(key,疊代器values,context),然後寫出
map前,reduce後詳解
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLxcmaNJzaq10MNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3cjM3QzMyYTMxITNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
分片詳解
什麼是分片?
在進行map計算之前,map會根據輸入檔案計算輸入分片(input split);每個輸入分片(input split)針對一個map任務,輸入分片存儲的并非是資料本身,而是一個分片長度和一個記錄資料的位置的數組。
邏輯概念,分片資訊包括起始偏移量,分片大小,分片資料所在的塊的資訊,塊所在的主機清單。
注意:分片是根據輸入檔案計算的,這些輸入檔案即是存儲在hdfs上的那些個檔案。
為何要分片?
答:友善多個Map任務并行處理,提高運作效率。
每一個分片對應着一個maptask,通過調整分片的大小可以調整maptask的數量,也就是調整map階段的并行度。
分片大小計算?
下面是源程式中對分片大小的計算:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//傳回1
long maxSize = getMaxSplitSize(job);//傳回long的最大值
long splitSize = computeSplitSize(blockSize, minSize, maxSize)
return Math.max(minSize, Math.min(maxSize, blockSize));
總計:先比較塊大小和最大分片,選出其中較小的,然後将結果和最小分片比較,選出較大的。
//計算分片大小,實際的分片大小。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一個1.1倍的備援
通過這個可以對實際分片大小進行設定,主要從最大分片大小和最小分片大小入手。
(1)FileInputFormat.setMinInputSplitSize(job,1000);
(2)FileInputFormat.setMaxInputSplitSize(job,1000000);
分片的讀取規則
- 第一個分片從第一行開始讀取,讀到分片末尾,再讀取下一個分片的第一行
- 既不是第一個分片也不是最後一個分片,第一行資料舍去,讀到分片末尾,再繼續讀 取下一個分片的第一行資料
- 最後一個分片舍去第一行,讀到分片末尾
控制maptask和reducetask數量
控制maptask數量:
1)maptask數量由分片數量決定,可設定maxsize,minsize,blocksize來控制分片的大小,進而控制分片數量
2)改變資料總量也可影響maptask數量
控制reducetask數量:
1)job.setNumReduceTasks(5); 直接設定reducetask數量
2)分區數和reducetask數量是一緻的,可以調整分區數。
MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
-
org.apache.hadoop.mapred.OutputCollector(即文中的輸出收集器),OutputCollector由Hadoop架構提供,負責收集Mapper和Reducer的輸出資料,實作map函數和reduce函數時,隻需要簡單的将其輸出的<key,value>對往OutputCollector中一丢即可,剩餘的事情架構自會幫你處理好。
可以了解為map()和reduce()不會每執行一次便寫出,是需要積累的,具體流程可以檢視底層原理進行了解。
-
為什麼要寫入環形緩沖區?
答: hadoop在執行MapReduce任務的時候,在map階段,map()函數産生輸出以後,并不是直接寫入到磁盤中,而是先寫入到了環形緩沖區,這樣做的原因是無論緩沖區寫入還是讀出,速度都更快,效率更高
-
寫入磁盤緩沖區的時候,為什麼隻寫入百分之80,那百分之20幹啥的?
答:這個記憶體緩沖區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過程被稱為Spill,中文可譯為溢寫,字面意思很直覺。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。
-
分片資料和溢寫磁盤檔案的對應關系?
通過上一條問題可知,環形緩沖區會重複利用。
一個分片資料有可能會産生多個溢寫磁盤檔案。
- 内/外部排序針對的是記憶體,在記憶體中的排序稱之為内部排序,反之,在磁盤等位置的排序稱為外部排序。
MR運作全流程中自定義部分
在MR運作全流程中,存在若幹過程是設計人員可以進行自定義設計,即人為幹預的。
包括:
- TextInputFormat
- map()函數
- 自定義分區(注意:先分區後排序,且是在溢寫之前進行分區)
- 分區内自定義排序(注意:要在分區内進行排序)
- combine()
- 傳入reduce()函數的key,我們可以自定義key類型,并定義什麼樣的key算是一樣的
- reduce( )
- TextOutputFormat
自定義資料類型
1.要實作writable接口
2.讀寫順序要一緻
3.構造方法如果進行了重寫,要顯示定義無參的構造方法
4.重寫toString方法
自定義分區
資料分發的政策
如何實作自定義分區?
前提要保證相同key的資料會發送到同一個reduce中,或者說是分到同一個分區中
Partitioner<key,value> ,這裡的key和value的資料類型和map輸出的資料類型保持一緻
繼承Patitioner這個類,重寫分區方法getPartition(map輸出key,map輸出value,分區數量)
然後在job中設定使用我們自定義的分區方法進行資料分發
/**
場景:将不同手機号字首歸向不同省份地區
*/