涉及到的類:WordcountDriver.java, WordcountMapper.java WordcountReducer.java
wordcount.txt檔案内容:
hello my name is zhuzhiwen
what is your name
hello my name is yy
hello my name is kuaishou
hello my name is baidu
hh
haha
haha
haha
ni men dou shi shazi
fan zheng wo bu shi
shi ai shi shei shi
ni men dou yi bian wan qu ba
已上傳的hdfs:
WordcountDriver.java類:
package cn.itcast.bigdata.mr.wcdemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相當于一個yarn叢集的用戶端
* 需要在此封裝我們的mr程式的相關運作參數,指定jar包
* 最後送出給yarn
* @author
*
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
if (args == null || args.length == 0) {
args = new String[2];
args[0] = "hdfs://mina0:9000/wordcount/input/wordcount.log";
args[1] = "hdfs://mina0:9000/wordcount/output2";
}
Configuration conf = new Configuration();
conf.set("HADOOP_USER_NAME", "hadoop");
//設定的沒有用! ??????
// conf.set("HADOOP_USER_NAME", "hadoop");
// conf.set("dfs.permissions.enabled", "false");
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resoucemanager.hostname", "mini1");*/
Job job = Job.getInstance(conf);
/*job.setJar("/home/hadoop/wc.jar");*/
//指定本程式的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業務job要使用的mapper/Reducer業務類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper輸出資料的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的資料的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相關參數,以及job所用的java類所在的jar包,送出給yarn去運作
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
WordcountMapper.java 類:
package cn.itcast.bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN: 預設情況下,是mr架構所讀到的一行文本的起始偏移量,Long,
* 但是在hadoop中有自己的更精簡的序列化接口,是以不直接用Long,而用LongWritable
*
* VALUEIN:預設情況下,是mr架構所讀到的一行文本的内容,String,同上,用Text
*
* KEYOUT:是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text
* VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable
*
* @author
*
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* map階段的業務邏輯就寫在自定義的map()方法中
* maptask會對每一行輸入資料調用一次我們自定義的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将maptask傳給我們的文本内容先轉換成String
String line = value.toString();
//根據空格将這一行切分成單詞
String[] words = line.split(" ");
//将單詞輸出為<單詞,1>
for(String word:words){
//将單詞作為key,将次數1作為value,以便于後續的資料分發,可以根據單詞分發,以便于相同單詞會到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
WordcountReducer.java 類:
package cn.itcast.bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應
*
* KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出資料類型
* KEYOUT是單詞
* VLAUEOUT是總次數
* @author
*
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
* 入參key,是一組相同單詞kv對的key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
/*Iterator<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
count += iterator.next().get();
}*/
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
執行結果: