(一) 什麼是MapReduce?
(1).概念
官網上原話翻譯成中文這麼說的:Hadoop MapReduce是一個用于輕松編寫應用程式的軟體架構,它以可靠的容錯方式在大型群集(數千個節點)的商品硬體上并行處理海量資料(多TB資料集).
關鍵詞:軟體架構 可靠性 高容錯 處理海量資料
說白了,其實mapreduce就是把大量資料通過分而治之的核心思想來實作分布式計算架構,核心架構map和reduce任務。
(2) Map任務
2.1) 讀取文檔裡面的資料内容,inputSplit将資料内容解析成k-v鍵值對,每對鍵值對調用一次map函數;
2.2) 實作自己的map函數,k-v鍵值對處理并輸出到中間結果;
2.3) 對k-v鍵值對進行分區(partitioner)
2.4) 按照分區對k-v進行處理(排序和合并),
a.對相同key的value值放入一個list集合裡面;
b.将不同分區的key相同的鍵值對放入同一個reduce處理。
2.5) 對mapper輸出的結果進行歸約并傳入reduce(mapper任務的輸出就是reduce任務的輸入)
(3) Reduce任務
3.1)存在多個reduce的情況下,多mapper任務輸出的結果按照不同分區複制到不同的reduce下,單個更簡單;
3.2)reduce對mapper的輸入結果進行合并、排序,然後在調用自己實作的reduce()方法來對k-v進行處理,得到新的
k-v鍵值對輸出;
3.3)把通過自定義的reduce()處理後的結果存入文檔;
(二) MapReduce的執行原理
MapReduce 運作的時候,會通過Mapper 運作的任務讀取HDFS 中的資料檔案,然後調用自己的方法,處理資料,最後輸出;
Reducer 任務會接收Mapper 任務輸出的資料,作為自己的輸入資料,調用自己的方法,最後輸出到HDFS 的檔案中,如下圖
2.1) Mapper任務執行流程
每個Mapper 任務是一個java 程序,它會讀取HDFS 中的檔案,解析成很多的鍵值對,經過我們覆寫的map 方法處理後,轉換為很多的鍵值對再輸出。
整個Mapper 任務的處理過程又可以分為以下幾個階段,如圖
①:把輸入檔案按照一定的标準分片(InputSplit),每個輸入片的大小是固定的。預設情況下,輸入片(InputSplit)的大小與資料塊(Block)的大小是相同的;
如果資料塊(Block)的大小是預設值64MB,輸入檔案有兩個,一個是32MB,一個是72MB;麼小的檔案是一個輸入片,大檔案會分為兩個資料塊,
那麼是兩個輸入片。一共産生三個輸入片;每一個輸入片由一個Mapper 程序處理。(ps:Hadoop1.0資料塊block預設64M,Hadoop2.0以後預設都是128M)
②:對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文本内容解析成鍵值對。“鍵”是每一行的起始位置(機關是位元組),“值”是本行的文本内容。
③:每一對鍵值對都會調用一次map()來對輸出資料做一些處理,并且每一次調用map()都會産生0到多對鍵值對| ps :這裡的map()就是我們自己覆寫實作的。
④:對鍵值對按照一定算法進行分區,分區是指劃分key的空間,分區的總數對應job的reduce數,也就是說分區和reduce是一一對應的,
預設隻有一個分區,分區名為HashPartitioner。(ps:分區的實質其實就是資料歸類,為了減輕reduce的壓力)
⑤:對④步驟中各個分區内部根據key來進行排序,對key-value鍵值對進行分組。
⑥:對分區中的資料進行規約。(ps:減少到reduce的鍵值對資料量,作業時間變短,效率提高)
2.2) Reduce任務執行流程
Reducer 任務接收Mapper 任務的輸出,歸約處理後寫入到HDFS 中,步驟圖所示:
①:将各個分區的mapper輸出的鍵值對傳入到reduce裡面;
②:将reduce裡面的k-v鍵值對進行合并,接下來對其合并結果進行排序;
③:對執行好①、②操作後的鍵值對結果輸出到指定位置。(ps:如HDFS檔案系統)
2.3) 一個mapreduce的job簡單原理圖
ps:<key1,value1>{input} ------>map------> <key2, value2>{combine} ------> <key2,value2>----->reduce-----><key3,value3>{output}
(三) 單詞計數的例子展示
3.1)單詞計數結構原理模拟圖
①Combiner 節點負責完成上面提到的将同一個map中相同的key進行合并,避免重複傳輸,進而減少傳輸中的通信開銷
②Partitioner節點負責将map産生的中間結果進行劃分,確定相同的key到達同一個reduce節點.
③
3.2)代碼執行個體展示
ps:部落客使用的開發工具是IDE,開發語言是java,當然有興趣也使用其他語言去實作!
①:先來張wordcount類簡略圖
②:來分析下每個方法,這裡就偷懶,就直接貼代碼
public class WordCount {
private final static String INPUT_PATH = "hdfs://master:9000/input"; //輸入路徑
private final static String OUTPUT_PATH = "hdfs://master:9000/output"; //輸出路徑,必須是不存在的
private static Configuration getConfiguration() {
Configuration conf = new Configuration();
return conf;
}
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);//表示單詞出現次數
private Text word = new Text();//某個單詞
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//表示每次處理一行資料,并且按空格分割
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, one);//寫入上下文中,用于後續傳入reduce
}
}
}
static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable(0); //某單詞出現的總次數
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.建立Configuration
Configuration cf = new Configuration();
//2.準備環境
Path outputPath = new Path(OUTPUT_PATH);
Path inputPath = new Path(INPUT_PATH);
FileSystem fs = FileSystem.get(getConfiguration());
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//3.建立job
Job job = new Job(cf, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class);//打成jar
FileInputFormat.setInputPaths(job, inputPath);//讀取HDFS檔案路徑
//設定map相關
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定reduce相關
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定job的輸出環境
FileOutputFormat.setOutputPath(job, outputPath);
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
第一步:打包我們上面的類大jar(ps:是jar包不是war包)
第二步:啟動hadoop叢集(ps:這裡怎麼配置叢集我就不多說了,不懂的問度娘,網上一大堆)
第三步:建立輸入路徑并上傳資源檔案到HDFS,與java代碼裡面的input輸出路徑一一對應;
第四步:運作jar,獲得結果
PS:MapReduce大概就這樣了,等有機會更深入的了解,再來繼續更新......