源代碼:
WordCountMapper.java:
package cn.idcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
四個泛型解釋:
KEYIN:k1的類型
VALUEIN:v1的類型
KEYOUT:k2的類型
VALUEOUT:v2的類型
*/
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
//map方法就是将K1和v1 轉為k2和v2
/*
參數:
key :k1 行偏移量
value :v1 每一行的文本資料
context:表示上下文對象
*/
/*
如何将K1和v1 轉為k2和v2
k1 v1
0 hello,world,hadoop
15 hdfs,hive,hello
-------------------------
k2 v2
hello 1
world 1
hdfs 1
hadoop 1
hello 1
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1:将一行的文本資料進行拆分
String[] split = value.toString().split(",");
//2:周遊數組,組裝k2和v2
for (String word : split) {
//3:将k2和v2寫入上下文中
text.set(word);
longWritable.set(1);
context.write(text,longWritable);
}
}
}
WordCountReducer.java:
package cn.idcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
四個泛型解釋:
KEYIN:k2的類型
VALUEIN:v2的類型
KEYOUT:k3的類型
VALUEOUT:v3的類型
*/
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
//reduce方法作用:将新的k2和v2轉為 k3和v3,将k3 和v3寫入上下文中
/*
參數:
key :新k2
values :集合 新v2
context:表示上下文對象
-----------------------
如何将新的k2和v2轉為k3和v3
新 k2 v2
hello <1,1,1>
world <1,1>
hadoop <1>
-------------------------
k3 v3
hello 3
world 2
hadoop 1
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count=0;
//1:周遊集合,将集合中的數字相加,得到v3
for (LongWritable value : values) {
count +=value.get();
}
//2:将k3和v3寫入上下文中
context.write(key,new LongWritable(count));
}
}
JobMain.java:
package cn.idcast.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//該方法用于指定一個job任務
@Override
public int run(String[] args) throws Exception {
//1:建立一個job任務對象
Job job = Job.getInstance(super.getConf(), "wordcount");
//如果打包運作出錯,則需要加該配置
job.setJarByClass(JobMain.class);
//2:配置job任務對象(八個步驟)
//第一步:指定檔案的讀取方式和讀取路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/wordcount"));
//第二部:指定Map階段的處理方式
job.setMapperClass(WordCountMapper.class);
//設定Map階段k2的類型
job.setMapOutputKeyClass(Text.class);
//設定Map階段v2的類型
job.setMapOutputValueClass(LongWritable.class);
//第三,四,五,六 采用預設方式,現階段不做處理
//第七步:指定Reduce階段的處理方式和資料類型
job.setReducerClass(WordCountReducer.class);
//設定k3的類型
job.setOutputKeyClass(Text.class);
//設定v3的類型
job.setOutputValueClass(LongWritable.class);
//第八步:設定輸出類型
job.setOutputFormatClass(TextOutputFormat.class);
//設定輸出的路徑
Path path=new Path("hdfs://node1:8020/wordcount_out");
TextOutputFormat.setOutputPath(job,path);
//擷取FileSystem
FileSystem fs = FileSystem.get(new URI("hdfs://node1:8020/wordcount_out"),new Configuration());
//判斷目錄是否存在
if (fs.exists(path)) {
fs.delete(path, true);
System.out.println("存在此輸出路徑,已删除!!!");
}
//等待任務結束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
記錄一個小錯誤:
發現key重複輸出了,原因:reduce步驟中把送出上下文放到循環裡去了,導緻每加一次就輸出一次