天天看點

大資料Hadoop之MapReduce

(一) 什麼是MapReduce?

 (1).概念

官網上原話翻譯成中文這麼說的:Hadoop MapReduce是一個用于輕松編寫應用程式的軟體架構,它以可靠的容錯方式在大型群集(數千個節點)的商品硬體上并行處理海量資料(多TB資料集).

  關鍵詞:軟體架構 可靠性 高容錯 處理海量資料

說白了,其實mapreduce就是把大量資料通過分而治之的核心思想來實作分布式計算架構,核心架構map和reduce任務。

(2) Map任務

大資料Hadoop之MapReduce

2.1) 讀取文檔裡面的資料内容,inputSplit将資料内容解析成k-v鍵值對,每對鍵值對調用一次map函數;

2.2) 實作自己的map函數,k-v鍵值對處理并輸出到中間結果;

2.3) 對k-v鍵值對進行分區(partitioner)

2.4) 按照分區對k-v進行處理(排序和合并),

a.對相同key的value值放入一個list集合裡面;

b.将不同分區的key相同的鍵值對放入同一個reduce處理。

2.5) 對mapper輸出的結果進行歸約并傳入reduce(mapper任務的輸出就是reduce任務的輸入)

(3) Reduce任務

3.1)存在多個reduce的情況下,多mapper任務輸出的結果按照不同分區複制到不同的reduce下,單個更簡單;

3.2)reduce對mapper的輸入結果進行合并、排序,然後在調用自己實作的reduce()方法來對k-v進行處理,得到新的

k-v鍵值對輸出;

3.3)把通過自定義的reduce()處理後的結果存入文檔;

(二) MapReduce的執行原理

 MapReduce 運作的時候,會通過Mapper 運作的任務讀取HDFS 中的資料檔案,然後調用自己的方法,處理資料,最後輸出;

Reducer 任務會接收Mapper 任務輸出的資料,作為自己的輸入資料,調用自己的方法,最後輸出到HDFS 的檔案中,如下圖

大資料Hadoop之MapReduce

2.1) Mapper任務執行流程

每個Mapper 任務是一個java 程序,它會讀取HDFS 中的檔案,解析成很多的鍵值對,經過我們覆寫的map 方法處理後,轉換為很多的鍵值對再輸出。

整個Mapper 任務的處理過程又可以分為以下幾個階段,如圖

大資料Hadoop之MapReduce

  ①:把輸入檔案按照一定的标準分片(InputSplit),每個輸入片的大小是固定的。預設情況下,輸入片(InputSplit)的大小與資料塊(Block)的大小是相同的;

如果資料塊(Block)的大小是預設值64MB,輸入檔案有兩個,一個是32MB,一個是72MB;麼小的檔案是一個輸入片,大檔案會分為兩個資料塊,

那麼是兩個輸入片。一共産生三個輸入片;每一個輸入片由一個Mapper 程序處理。(ps:Hadoop1.0資料塊block預設64M,Hadoop2.0以後預設都是128M)

②:對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文本内容解析成鍵值對。“鍵”是每一行的起始位置(機關是位元組),“值”是本行的文本内容。

③:每一對鍵值對都會調用一次map()來對輸出資料做一些處理,并且每一次調用map()都會産生0到多對鍵值對| ps :這裡的map()就是我們自己覆寫實作的。

④:對鍵值對按照一定算法進行分區,分區是指劃分key的空間,分區的總數對應job的reduce數,也就是說分區和reduce是一一對應的,

預設隻有一個分區,分區名為HashPartitioner。(ps:分區的實質其實就是資料歸類,為了減輕reduce的壓力)

⑤:對④步驟中各個分區内部根據key來進行排序,對key-value鍵值對進行分組。

⑥:對分區中的資料進行規約。(ps:減少到reduce的鍵值對資料量,作業時間變短,效率提高)

2.2) Reduce任務執行流程

 Reducer 任務接收Mapper 任務的輸出,歸約處理後寫入到HDFS 中,步驟圖所示:

大資料Hadoop之MapReduce

①:将各個分區的mapper輸出的鍵值對傳入到reduce裡面;

②:将reduce裡面的k-v鍵值對進行合并,接下來對其合并結果進行排序;

③:對執行好①、②操作後的鍵值對結果輸出到指定位置。(ps:如HDFS檔案系統)

2.3) 一個mapreduce的job簡單原理圖

大資料Hadoop之MapReduce

ps:<key1,value1>{input} ------>map------> <key2, value2>{combine} ------> <key2,value2>----->reduce-----><key3,value3>{output}

(三) 單詞計數的例子展示

3.1)單詞計數結構原理模拟圖

①Combiner 節點負責完成上面提到的将同一個map中相同的key進行合并,避免重複傳輸,進而減少傳輸中的通信開銷

②Partitioner節點負責将map産生的中間結果進行劃分,確定相同的key到達同一個reduce節點.

大資料Hadoop之MapReduce

3.2)代碼執行個體展示

ps:部落客使用的開發工具是IDE,開發語言是java,當然有興趣也使用其他語言去實作!

①:先來張wordcount類簡略圖

大資料Hadoop之MapReduce

 ②:來分析下每個方法,這裡就偷懶,就直接貼代碼

public class WordCount {
    private final static String INPUT_PATH = "hdfs://master:9000/input"; //輸入路徑
    private final static String OUTPUT_PATH = "hdfs://master:9000/output"; //輸出路徑,必須是不存在的

    private static Configuration getConfiguration() {
        Configuration conf = new Configuration();
        return conf;
    }

    static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);//表示單詞出現次數
        private Text word = new Text();//某個單詞

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //表示每次處理一行資料,并且按空格分割
            StringTokenizer st = new StringTokenizer(value.toString());
            while (st.hasMoreTokens()) {
                word.set(st.nextToken());
                context.write(word, one);//寫入上下文中,用于後續傳入reduce
            }
        }
    }

    static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable(0); //某單詞出現的總次數

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
                InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.建立Configuration
        Configuration cf = new Configuration();
        //2.準備環境
        Path outputPath = new Path(OUTPUT_PATH);
        Path inputPath = new Path(INPUT_PATH);
        FileSystem fs = FileSystem.get(getConfiguration());
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        //3.建立job
        Job job = new Job(cf, WordCount.class.getSimpleName());
        job.setJarByClass(WordCount.class);//打成jar
        FileInputFormat.setInputPaths(job, inputPath);//讀取HDFS檔案路徑

        //設定map相關
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //設定reduce相關
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設定job的輸出環境
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}
           

第一步:打包我們上面的類大jar(ps:是jar包不是war包)

第二步:啟動hadoop叢集(ps:這裡怎麼配置叢集我就不多說了,不懂的問度娘,網上一大堆)

大資料Hadoop之MapReduce

  第三步:建立輸入路徑并上傳資源檔案到HDFS,與java代碼裡面的input輸出路徑一一對應;

大資料Hadoop之MapReduce
大資料Hadoop之MapReduce

第四步:運作jar,獲得結果

大資料Hadoop之MapReduce

PS:MapReduce大概就這樣了,等有機會更深入的了解,再來繼續更新......

繼續閱讀