天天看點

mapreduce統計單詞

源代碼:

WordCountMapper.java:

package cn.idcast.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
 四個泛型解釋:
 KEYIN:k1的類型
 VALUEIN:v1的類型

 KEYOUT:k2的類型
 VALUEOUT:v2的類型
*/
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {

    //map方法就是将K1和v1 轉為k2和v2
    /*
        參數:
           key    :k1    行偏移量
           value  :v1    每一行的文本資料
           context:表示上下文對象
     */
    /*
       如何将K1和v1 轉為k2和v2
       k1         v1
       0    hello,world,hadoop
       15   hdfs,hive,hello
      -------------------------

       k2         v2
       hello      1
       world      1
       hdfs       1
       hadoop     1
       hello      1
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1:将一行的文本資料進行拆分
        String[] split = value.toString().split(",");

        //2:周遊數組,組裝k2和v2
        for (String word : split) {
            //3:将k2和v2寫入上下文中
            text.set(word);
            longWritable.set(1);
            context.write(text,longWritable);
        }

    }
}      

WordCountReducer.java:

package cn.idcast.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 四個泛型解釋:
 KEYIN:k2的類型
 VALUEIN:v2的類型

 KEYOUT:k3的類型
 VALUEOUT:v3的類型
*/
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    //reduce方法作用:将新的k2和v2轉為 k3和v3,将k3 和v3寫入上下文中
    /*
        參數:
           key    :新k2
           values :集合 新v2
           context:表示上下文對象
          -----------------------
           如何将新的k2和v2轉為k3和v3
           新 k2         v2
             hello       <1,1,1>
             world       <1,1>
             hadoop      <1>
         -------------------------
              k3         v3
             hello        3
             world        2
             hadoop       1
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count=0;
        //1:周遊集合,将集合中的數字相加,得到v3
        for (LongWritable value : values) {
            count +=value.get();
        }
        //2:将k3和v3寫入上下文中
        context.write(key,new LongWritable(count));
    }
}      

JobMain.java:

package cn.idcast.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool {

    //該方法用于指定一個job任務
    @Override
    public int run(String[] args) throws Exception {
        //1:建立一個job任務對象
        Job job = Job.getInstance(super.getConf(), "wordcount");
        //如果打包運作出錯,則需要加該配置
        job.setJarByClass(JobMain.class);
        //2:配置job任務對象(八個步驟)

        //第一步:指定檔案的讀取方式和讀取路徑
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/wordcount"));

        //第二部:指定Map階段的處理方式
        job.setMapperClass(WordCountMapper.class);
        //設定Map階段k2的類型
        job.setMapOutputKeyClass(Text.class);
        //設定Map階段v2的類型
        job.setMapOutputValueClass(LongWritable.class);

        //第三,四,五,六 采用預設方式,現階段不做處理

        //第七步:指定Reduce階段的處理方式和資料類型
        job.setReducerClass(WordCountReducer.class);
        //設定k3的類型
        job.setOutputKeyClass(Text.class);
        //設定v3的類型
        job.setOutputValueClass(LongWritable.class);

        //第八步:設定輸出類型
        job.setOutputFormatClass(TextOutputFormat.class);
        //設定輸出的路徑
        Path path=new Path("hdfs://node1:8020/wordcount_out");
        TextOutputFormat.setOutputPath(job,path);

        //擷取FileSystem
        FileSystem fs = FileSystem.get(new URI("hdfs://node1:8020/wordcount_out"),new Configuration());
        //判斷目錄是否存在
        if (fs.exists(path)) {
            fs.delete(path, true);
            System.out.println("存在此輸出路徑,已删除!!!");
        }
        //等待任務結束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}      

記錄一個小錯誤:

mapreduce統計單詞

 發現key重複輸出了,原因:reduce步驟中把送出上下文放到循環裡去了,導緻每加一次就輸出一次

mapreduce統計單詞

繼續閱讀