天天看點

MapReduce詳解(MR運作全流程,shuffle,分區,分片)本文行文邏輯MapReduce程式詳解(即map中,reduce中)map前,reduce後詳解MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)

文章目錄

  • 本文行文邏輯
  • MapReduce程式詳解(即map中,reduce中)
  • map前,reduce後詳解
    • 分片詳解
      • 什麼是分片?
      • 為何要分片?
      • 分片大小計算?
      • 分片的讀取規則
    • 控制maptask和reducetask數量
  • MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
    • MR運作全流程中自定義部分
      • 自定義資料類型
      • 自定義分區
      • Combine
    • 分區詳解

本文行文邏輯

分别從MR過程的前,中,後三個階段對MR進行詳細介紹
           
  • 1

MapReduce程式詳解(即map中,reduce中)

一種分布式運算程式,分為map階段和reduce階段

Map階段會有一個實體程式,不需要我們自己開發,使用者隻需要維護map方法就可以

預設情況下map程式讀取一行資料(映射成key-value對是一行行的)就會調用一次map方法,而且會将這一行資料的偏移量作為key(即key一定是IntWritable),這一行資料的内容作為value傳回給架構,然後由架構寫出context.write(key,value)

Reduce 階段會有一個實體程式,不要我們自己開發我們需要維護reduce方法

Reduce程式會接受map端輸出的中間結果資料,而且相同的key的資料會到達同一個reduce執行個體中去,每個reduce執行個體會處理多個key的資料。Reduce程式會将自己收集的資料按照key相同進行分組,對一組資料調用一次reduce方法(按組調用reduce方法),并且将參數傳給reduce(key,疊代器values,context),然後寫出

map前,reduce後詳解

MapReduce詳解(MR運作全流程,shuffle,分區,分片)本文行文邏輯MapReduce程式詳解(即map中,reduce中)map前,reduce後詳解MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)

分片詳解

什麼是分片?

在進行map計算之前,map會根據輸入檔案計算輸入分片(input split);每個輸入分片(input split)針對一個map任務,輸入分片存儲的并非是資料本身,而是一個分片長度和一個記錄資料的位置的數組。

邏輯概念,分片資訊包括起始偏移量,分片大小,分片資料所在的塊的資訊,塊所在的主機清單。

注意:分片是根據輸入檔案計算的,這些輸入檔案即是存儲在hdfs上的那些個檔案。

為何要分片?

答:友善多個Map任務并行處理,提高運作效率。

每一個分片對應着一個maptask,通過調整分片的大小可以調整maptask的數量,也就是調整map階段的并行度。

分片大小計算?

下面是源程式中對分片大小的計算:

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//傳回1

long maxSize = getMaxSplitSize(job);//傳回long的最大值

long splitSize = computeSplitSize(blockSize, minSize, maxSize)

return Math.max(minSize, Math.min(maxSize, blockSize));

總計:先比較塊大小和最大分片,選出其中較小的,然後将結果和最小分片比較,選出較大的。

//計算分片大小,實際的分片大小。

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一個1.1倍的備援

通過這個可以對實際分片大小進行設定,主要從最大分片大小和最小分片大小入手。

(1)FileInputFormat.setMinInputSplitSize(job,1000);

(2)FileInputFormat.setMaxInputSplitSize(job,1000000);

分片的讀取規則

  1. 第一個分片從第一行開始讀取,讀到分片末尾,再讀取下一個分片的第一行
  2. 既不是第一個分片也不是最後一個分片,第一行資料舍去,讀到分片末尾,再繼續讀 取下一個分片的第一行資料
  3. 最後一個分片舍去第一行,讀到分片末尾

控制maptask和reducetask數量

控制maptask數量:

1)maptask數量由分片數量決定,可設定maxsize,minsize,blocksize來控制分片的大小,進而控制分片數量

2)改變資料總量也可影響maptask數量

控制reducetask數量:

1)job.setNumReduceTasks(5); 直接設定reducetask數量

2)分區數和reducetask數量是一緻的,可以調整分區數。

MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)

MapReduce詳解(MR運作全流程,shuffle,分區,分片)本文行文邏輯MapReduce程式詳解(即map中,reduce中)map前,reduce後詳解MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
  • org.apache.hadoop.mapred.OutputCollector(即文中的輸出收集器),OutputCollector由Hadoop架構提供,負責收集Mapper和Reducer的輸出資料,實作map函數和reduce函數時,隻需要簡單的将其輸出的<key,value>對往OutputCollector中一丢即可,剩餘的事情架構自會幫你處理好。

    可以了解為map()和reduce()不會每執行一次便寫出,是需要積累的,具體流程可以檢視底層原理進行了解。

  • 為什麼要寫入環形緩沖區?

    答: hadoop在執行MapReduce任務的時候,在map階段,map()函數産生輸出以後,并不是直接寫入到磁盤中,而是先寫入到了環形緩沖區,這樣做的原因是無論緩沖區寫入還是讀出,速度都更快,效率更高

  • 寫入磁盤緩沖區的時候,為什麼隻寫入百分之80,那百分之20幹啥的?

    答:這個記憶體緩沖區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過程被稱為Spill,中文可譯為溢寫,字面意思很直覺。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。

  • 分片資料和溢寫磁盤檔案的對應關系?

    通過上一條問題可知,環形緩沖區會重複利用。

    一個分片資料有可能會産生多個溢寫磁盤檔案。

  • 内/外部排序針對的是記憶體,在記憶體中的排序稱之為内部排序,反之,在磁盤等位置的排序稱為外部排序。

MR運作全流程中自定義部分

在MR運作全流程中,存在若幹過程是設計人員可以進行自定義設計,即人為幹預的。

包括:

  • TextInputFormat
  • map()函數
  • 自定義分區(注意:先分區後排序,且是在溢寫之前進行分區)
  • 分區内自定義排序(注意:要在分區内進行排序)
  • combine()
  • 傳入reduce()函數的key,我們可以自定義key類型,并定義什麼樣的key算是一樣的
  • reduce( )
  • TextOutputFormat

自定義資料類型

1.要實作writable接口

2.讀寫順序要一緻

3.構造方法如果進行了重寫,要顯示定義無參的構造方法

4.重寫toString方法

自定義分區

資料分發的政策

如何實作自定義分區?

前提要保證相同key的資料會發送到同一個reduce中,或者說是分到同一個分區中

Partitioner<key,value> ,這裡的key和value的資料類型和map輸出的資料類型保持一緻

繼承Patitioner這個類,重寫分區方法getPartition(map輸出key,map輸出value,分區數量)

然後在job中設定使用我們自定義的分區方法進行資料分發

/**
  場景:将不同手機号字首歸向不同省份地區
*/
           

public class ProvincePartitioner extends Partitioner<Text,FlowBean>{

private static HashMap<String,Integer>pmap = new HashMap<>();

static{

pmap.put(“136”,0);

pmap.put(“137”,1);

pmap.put(“138”,2);

pmap.put(“139”,3);

}

@Override

public int getPartition(Text key,FlowBean flowBean,int numPartitions){

String prefix = key.toStrirg().substring(0,3); #截取手機号前三位

Integer partNum = pmap.get(prefix);

return(partNum==null?4:partNUm);

}

}

job.setPartitioner(ProvincePartitioner.class);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Combine

combine出現的原因,作用?

減少資料量,提高傳輸效率。将形如A 1 A 1 A1 轉換成A 3,類似于map端的reduce。

注意:combine需要注意場合,不是什麼MR都适用。

分區詳解

分區的數量和reduce(或者叫reducetask)數量是一緻的。

文章目錄

  • 本文行文邏輯
  • MapReduce程式詳解(即map中,reduce中)
  • map前,reduce後詳解
    • 分片詳解
      • 什麼是分片?
      • 為何要分片?
      • 分片大小計算?
      • 分片的讀取規則
    • 控制maptask和reducetask數量
  • MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
    • MR運作全流程中自定義部分
      • 自定義資料類型
      • 自定義分區
      • Combine
    • 分區詳解

本文行文邏輯

分别從MR過程的前,中,後三個階段對MR進行詳細介紹
           
  • 1

MapReduce程式詳解(即map中,reduce中)

一種分布式運算程式,分為map階段和reduce階段

Map階段會有一個實體程式,不需要我們自己開發,使用者隻需要維護map方法就可以

預設情況下map程式讀取一行資料(映射成key-value對是一行行的)就會調用一次map方法,而且會将這一行資料的偏移量作為key(即key一定是IntWritable),這一行資料的内容作為value傳回給架構,然後由架構寫出context.write(key,value)

Reduce 階段會有一個實體程式,不要我們自己開發我們需要維護reduce方法

Reduce程式會接受map端輸出的中間結果資料,而且相同的key的資料會到達同一個reduce執行個體中去,每個reduce執行個體會處理多個key的資料。Reduce程式會将自己收集的資料按照key相同進行分組,對一組資料調用一次reduce方法(按組調用reduce方法),并且将參數傳給reduce(key,疊代器values,context),然後寫出

map前,reduce後詳解

MapReduce詳解(MR運作全流程,shuffle,分區,分片)本文行文邏輯MapReduce程式詳解(即map中,reduce中)map前,reduce後詳解MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)

分片詳解

什麼是分片?

在進行map計算之前,map會根據輸入檔案計算輸入分片(input split);每個輸入分片(input split)針對一個map任務,輸入分片存儲的并非是資料本身,而是一個分片長度和一個記錄資料的位置的數組。

邏輯概念,分片資訊包括起始偏移量,分片大小,分片資料所在的塊的資訊,塊所在的主機清單。

注意:分片是根據輸入檔案計算的,這些輸入檔案即是存儲在hdfs上的那些個檔案。

為何要分片?

答:友善多個Map任務并行處理,提高運作效率。

每一個分片對應着一個maptask,通過調整分片的大小可以調整maptask的數量,也就是調整map階段的并行度。

分片大小計算?

下面是源程式中對分片大小的計算:

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//傳回1

long maxSize = getMaxSplitSize(job);//傳回long的最大值

long splitSize = computeSplitSize(blockSize, minSize, maxSize)

return Math.max(minSize, Math.min(maxSize, blockSize));

總計:先比較塊大小和最大分片,選出其中較小的,然後将結果和最小分片比較,選出較大的。

//計算分片大小,實際的分片大小。

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {} //分片有一個1.1倍的備援

通過這個可以對實際分片大小進行設定,主要從最大分片大小和最小分片大小入手。

(1)FileInputFormat.setMinInputSplitSize(job,1000);

(2)FileInputFormat.setMaxInputSplitSize(job,1000000);

分片的讀取規則

  1. 第一個分片從第一行開始讀取,讀到分片末尾,再讀取下一個分片的第一行
  2. 既不是第一個分片也不是最後一個分片,第一行資料舍去,讀到分片末尾,再繼續讀 取下一個分片的第一行資料
  3. 最後一個分片舍去第一行,讀到分片末尾

控制maptask和reducetask數量

控制maptask數量:

1)maptask數量由分片數量決定,可設定maxsize,minsize,blocksize來控制分片的大小,進而控制分片數量

2)改變資料總量也可影響maptask數量

控制reducetask數量:

1)job.setNumReduceTasks(5); 直接設定reducetask數量

2)分區數和reducetask數量是一緻的,可以調整分區數。

MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)

MapReduce詳解(MR運作全流程,shuffle,分區,分片)本文行文邏輯MapReduce程式詳解(即map中,reduce中)map前,reduce後詳解MapReduce運作全流程(主要介紹map到reduce的其中過程,即shuffle流程)
  • org.apache.hadoop.mapred.OutputCollector(即文中的輸出收集器),OutputCollector由Hadoop架構提供,負責收集Mapper和Reducer的輸出資料,實作map函數和reduce函數時,隻需要簡單的将其輸出的<key,value>對往OutputCollector中一丢即可,剩餘的事情架構自會幫你處理好。

    可以了解為map()和reduce()不會每執行一次便寫出,是需要積累的,具體流程可以檢視底層原理進行了解。

  • 為什麼要寫入環形緩沖區?

    答: hadoop在執行MapReduce任務的時候,在map階段,map()函數産生輸出以後,并不是直接寫入到磁盤中,而是先寫入到了環形緩沖區,這樣做的原因是無論緩沖區寫入還是讀出,速度都更快,效率更高

  • 寫入磁盤緩沖區的時候,為什麼隻寫入百分之80,那百分之20幹啥的?

    答:這個記憶體緩沖區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過程被稱為Spill,中文可譯為溢寫,字面意思很直覺。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。

  • 分片資料和溢寫磁盤檔案的對應關系?

    通過上一條問題可知,環形緩沖區會重複利用。

    一個分片資料有可能會産生多個溢寫磁盤檔案。

  • 内/外部排序針對的是記憶體,在記憶體中的排序稱之為内部排序,反之,在磁盤等位置的排序稱為外部排序。

MR運作全流程中自定義部分

在MR運作全流程中,存在若幹過程是設計人員可以進行自定義設計,即人為幹預的。

包括:

  • TextInputFormat
  • map()函數
  • 自定義分區(注意:先分區後排序,且是在溢寫之前進行分區)
  • 分區内自定義排序(注意:要在分區内進行排序)
  • combine()
  • 傳入reduce()函數的key,我們可以自定義key類型,并定義什麼樣的key算是一樣的
  • reduce( )
  • TextOutputFormat

自定義資料類型

1.要實作writable接口

2.讀寫順序要一緻

3.構造方法如果進行了重寫,要顯示定義無參的構造方法

4.重寫toString方法

自定義分區

資料分發的政策

如何實作自定義分區?

前提要保證相同key的資料會發送到同一個reduce中,或者說是分到同一個分區中

Partitioner<key,value> ,這裡的key和value的資料類型和map輸出的資料類型保持一緻

繼承Patitioner這個類,重寫分區方法getPartition(map輸出key,map輸出value,分區數量)

然後在job中設定使用我們自定義的分區方法進行資料分發

/**
  場景:将不同手機号字首歸向不同省份地區
*/
           

繼續閱讀