实验的推进模式
- 先配置好eclipse for hadoop
- 直接借用WordCount去测试
-
利用WordCount的基本程序框架,编写自己的代码
要点:map/reduce的所在的类和方法的数据类型(Text,intWritable,NullWritable,LongWritable以及自定义的…)
map和reduce程序主体的编写…
- 建议 尽量自行去编写一个样例程序(如连接运算)
- 差不多了,可以去应对实验要求的内容
关于mapreduce概念的介绍可以参考这篇博客:MapReduce
关于1、2两点可以参考这篇博客:Eclipes实现Mapreduce的配置(配图解与WordCount案例)
mapreduce编程的基本框架
-
该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
- 该程序中的 TokenizerMapper 类继承了 Mapper 类
- 该程序中的 IntSumReducer 类继承了 Reducer 类
总结: MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分 编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继承 Mapper 类和 Reducer 类
- 用户编写的程序分成三个部分: Mapper, Reducer, main
- Mapper 的输入数据是键值对的形式( 键值对的类型可自定义)
- Mapper 的输出数据是键值对对的形式( 键值对的类型可自定义)
- Mapper 中的业务逻辑写在
方法中map()
-
方法( maptask 进程)对每一对键值对调用一次map()
- Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是键值对
- Reducer 的业务逻辑写在
方法中reduce()
- Reducetask 进程对每一组相同 k e y key key的键值对组调用一次
方法reduce()
- 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
- 整个程序中描述各种必要信息的 job 对象都写在main里面
对于框架的每一部分具体介绍:
map部分:
系统已经自带Mapper类,每次使用只需要基础这个类即可。
在
Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
类中,
- K E Y I N KEYIN KEYIN是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量。
- V A L U E I N VALUEIN VALUEIN是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容
- K E Y O U T KEYOUT KEYOUT是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定
- V A L U E O U T VALUEOUT VALUEOUT是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定
要注意的是String、Long等jdk中自带的数据类型,在序列化时,效率较低,hadoop为了题高序列化效率,自定义了一套序列化框架,所以在hadoop的程序中,如果该数据需要进行序列化,就一定要用实现了hadoop序列化框架的数据类型。以下列举了几个常用的
Java类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
array | ArrayWritable |
reduce部分
和map一样,reduce也有自己自带的类Reducer,同样也是四个数据类型,分别表示输入和输出。输入的是Map阶段的处理结果,输出就是Reduce最后的输出
reducetask在调我们写得reduce方法,reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分,所以reducetaks的输入类型必须和maptask的输出类型一样
reducetask将这些收到的键值对数据拿来处理时,是这样调用我们的reduce方法的:
先将自己收到的所有的键值对按照key来分组
将某一组键值对组中的第一个键值对中的key传给reduce方法的key变量,把这一组键值对中所有的value用一个迭代器传给reduce方法的变量value
main部分
main方法是该mapreduce程序运行的入口,其中用一个job类对象来管理程序运行时所需要的很多参数:
比如,指定用哪个组件作为数据读取器、数据结果输出器
指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类
指定job程序的jar包所在路径…
大致框架
public static void main(String[] args)throws Exception {
//指定hdfs相关的参数
Configuration conf=new Configuration();//程序运行时参数
conf.set("fs.default.name","hdfs://localhost:9000");//hdfs的主节点
System.setProperty("HADOOP_USER_NAEM","hadoop");
//新建一个job任务
Job job=Job.getInstance(conf,"Merge and DR");//设置环境参数
//设置jar包所在路径
job.setJarByClass(Main.class);//设置整个程序的类名
//指定mapper类和reducer类
job.setMapperClass(Merge_DR.MergeMapper.class);//添加Mapper类
job.setReducerClass(Merge_DR.MergeReducer.class);//添加Reducer类
///指定reducetask的输出类型
job.setOutputKeyClass(Text.class);//设置输出类型
job.setOutputValueClass(Text.class);//设置输出类型
//指定该mapreduce程序数据的输入和输出路径
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
//最后提交任务
System.exit(job.waitForCompletion(true)?0:1);
WordCount的分析
编写Map处理逻辑
- Map输入类型为<key,value>
- 期望的Map输出类型为<单词,出现次数>
- Map输入类型最终确定为<Object,Text>
- Map输出类型最终确定为<Text,IntWritable>
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
编写Reduce处理逻辑
- 在Reduce处理数据之前,Map的结果首先通过Shuffle阶段进行整理
- Reduce阶段的任务:对输入数字序列进行求和
- Reduce的输入数据为<key,Iterable容器>
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
编写main方法
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //设置环境参数
job.setJarByClass(WordCount.class); //设置整个程序的类名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
job.setOutputKeyClass(Text.class); //设置输出类型
job.setOutputValueClass(IntWritable.class); //设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //设置环境参数
job.setJarByClass(WordCount.class); //设置整个程序的类名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
job.setOutputKeyClass(Text.class); //设置输出类型
job.setOutputValueClass(IntWritable.class); //设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
完整代码
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //设置环境参数
job.setJarByClass(WordCount.class);//设置整个程序的类名
job.setMapperClass(WordCount.TokenizerMapper.class);//添加Mapper类
job.setReducerClass(WordCount.IntSumReducer.class);//添加Reducer类
job.setOutputKeyClass(Text.class);//设置输出类型
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
实验题编程
了解了这些知识、掌握了大致的框架和弄懂了这道编程实例就可以去应对实验要求的内容
其中input的内容可以用命令行语句写或者直接在程序中写,记得每次运行新的程序要把input和output的内容删掉。
例如:
# 创建文件wordfile1.txt和wordfile2.txt
cd ~
vim wordfile1.txt
vim wordfile2.txt
# 在运行程序之前,需要启动Hadoop
cd /usr/local/hadoop
./sbin/start-dfs.sh
# 在启动Hadoop之后,需要首先删除HDFS中对应的input和output目录,保证文件夹不存在,这样确保后面程序运行不会出现问题
./bin/hdfs dfs –rm –r /input
./bin/hdfs dfs –rm –r /output
# 然后,再在HDFS中新建对应的input目录,如“/input”目录,表示在根目录下新建一个input文件夹
./bin/hdfs dfs –mkdir /input
# 在Linux本地文件系统中新建两个文件wordfile1.txt和wordfile2.txt(假设这两个文件位于“/home/hadoop”目录下),然后,把这两个文件上传到HDFS中的“/user/hadoop/input”目录下
./bin/hdfs dfs -put ~/wordfile1.txt /input
./bin/hdfs dfs -put ~/wordfile2.txt /input
1.编程实现文件合并和去重操作
任务要求:
对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。
输入文件A的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件B的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根据输入文件A和B合并得到的输出文件C的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge_DR {
public static class MergeMapper extends Mapper<Object,Text,Text,Text>{
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
context.write(value, new Text(""));
}
}
public static class MergeReducer extends Reducer<Text,Text,Text,Text>{
@Override
public void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程序运行时参数
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Merge and DR");//设置环境参数
job.setJarByClass(Merge_DR.class);//设置整个程序的类名
job.setMapperClass(Merge_DR.MergeMapper.class);//添加Mapper类
job.setCombinerClass(Merge_DR.MergeReducer.class);
job.setReducerClass(Merge_DR.MergeReducer.class);//添加Reducer类
job.setOutputKeyClass(Text.class);//设置输出类型
job.setOutputValueClass(Text.class);//设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
}
2. 编写程序实现对输入文件的排序
任务要求:
现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。
输入文件1的样例如下:
33
37
12
40
输入文件2的样例如下:
4
16
39
5
输入文件3的样例如下:
1
45
25
根据输入文件1、2和3得到的输出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge_Sort {
public static class MergeMapper extends Mapper<Object,Text,IntWritable,IntWritable>{
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
context.write(new IntWritable(Integer.parseInt(value.toString())),new IntWritable(1));
}
}
public static class MergeReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private static IntWritable cnt=new IntWritable(1);
@Override
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
for(IntWritable num:values){
context.write(cnt, key);
cnt=new IntWritable(cnt.get()+1);
}
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程序运行时参数
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Merge and Sort");//设置环境参数
job.setJarByClass(Merge_Sort.class);//设置整个程序的类名
job.setMapperClass(Merge_Sort.MergeMapper.class);//添加Mapper类
job.setReducerClass(Merge_Sort.MergeReducer.class);//添加Reducer类
job.setOutputKeyClass(IntWritable.class);//设置输出类型
job.setOutputValueClass(IntWritable.class);//设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
}
3. 对给定的表格进行信息挖掘
任务要求:
下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。
输入文件内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
输出文件内容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
sql语句实现:
代码:
import java.io.IOException;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Data_Mining {
public static class MergeMapper extends Mapper<Object,Text,Text,Text>{
private Text tkey=new Text();
private Text tvalue=new Text();
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
String[] tmp=value.toString().split("[\\s|\\t]+");
if(!tmp[0].equals("child")){
this.tkey.set(tmp[1]);
this.tvalue.set("l#,"+tmp[0]);
context.write(this.tkey, this.tvalue);
this.tkey.set(tmp[0]);
this.tvalue.set("r#,"+tmp[1]);
context.write(this.tkey, this.tvalue);
String[] txt=this.tvalue.toString().split(",");
}
}
}
public static class MergeReducer extends Reducer<Text,Text,Text,Text>{
private static boolean flag=true;
@Override
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
if(flag){
context.write(new Text("grandchild"), new Text("grandparent"));
flag=false;
}
LinkedList<String> listl=new LinkedList<String>();
LinkedList<String> listr=new LinkedList<String>();
for(Text text:values){
String[] tmp=text.toString().split(",");
if(tmp[0].equals("l#")){
listl.add(tmp[1]);
}
if(tmp[0].equals("r#")){
listr.add(tmp[1]);
}
}
for(String l:listl){
for(String r:listr){
context.write(new Text(l), new Text(r));
}
}
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程序运行时参数
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Data Mining");//设置环境参数
job.setJarByClass(Data_Mining.class);//设置整个程序的类名
job.setMapperClass(Data_Mining.MergeMapper.class);//添加Mapper类
job.setReducerClass(Data_Mining.MergeReducer.class);//添加Reducer类
job.setOutputKeyClass(Text.class);//设置输出类型
job.setOutputValueClass(Text.class);//设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
}