實驗的推進模式
- 先配置好eclipse for hadoop
- 直接借用WordCount去測試
-
利用WordCount的基本程式架構,編寫自己的代碼
要點:map/reduce的所在的類和方法的資料類型(Text,intWritable,NullWritable,LongWritable以及自定義的…)
map和reduce程式主體的編寫…
- 建議 盡量自行去編寫一個樣例程式(如連接配接運算)
- 差不多了,可以去應對實驗要求的内容
關于mapreduce概念的介紹可以參考這篇部落格:MapReduce
關于1、2兩點可以參考這篇部落格:Eclipes實作Mapreduce的配置(配圖解與WordCount案例)
mapreduce程式設計的基本架構
-
該程式有一個 main 方法,來啟動任務的運作,其中 job 對象就存儲了該程式運作的必要 資訊,比如指定 Mapper 類和 Reducer 類
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
- 該程式中的 TokenizerMapper 類繼承了 Mapper 類
- 該程式中的 IntSumReducer 類繼承了 Reducer 類
總結: MapReduce 程式的業務編碼分為兩個大部分,一部配置設定置程式的運作資訊,一部分 編寫該 MapReduce 程式的業務邏輯,并且業務邏輯的 map 階段和 reduce 階段的代碼分别繼承 Mapper 類和 Reducer 類
- 使用者編寫的程式分成三個部分: Mapper, Reducer, main
- Mapper 的輸入資料是鍵值對的形式( 鍵值對的類型可自定義)
- Mapper 的輸出資料是鍵值對對的形式( 鍵值對的類型可自定義)
- Mapper 中的業務邏輯寫在
方法中map()
-
方法( maptask 程序)對每一對鍵值對調用一次map()
- Reducer 的輸入資料類型對應 Mapper 的輸出資料類型,也是鍵值對
- Reducer 的業務邏輯寫在
方法中reduce()
- Reducetask 程序對每一組相同 k e y key key的鍵值對組調用一次
方法reduce()
- 使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類
- 整個程式中描述各種必要資訊的 job 對象都寫在main裡面
對于架構的每一部分具體介紹:
map部分:
系統已經自帶Mapper類,每次使用隻需要基礎這個類即可。
在
Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
類中,
- K E Y I N KEYIN KEYIN是指架構讀取到的資料的key的類型,在預設的InputFormat下,讀到的key是一行文本的起始偏移量。
- V A L U E I N VALUEIN VALUEIN是指架構讀取到的資料的value的類型,在預設的InputFormat下,讀到的value是一行文本的内容
- K E Y O U T KEYOUT KEYOUT是指使用者自定義邏輯方法傳回的資料中key的類型,由使用者業務邏輯決定
- V A L U E O U T VALUEOUT VALUEOUT是指使用者自定義邏輯方法傳回的資料中value的類型,由使用者業務邏輯決定
要注意的是String、Long等jdk中自帶的資料類型,在序列化時,效率較低,hadoop為了題高序列化效率,自定義了一套序列化架構,是以在hadoop的程式中,如果該資料需要進行序列化,就一定要用實作了hadoop序列化架構的資料類型。以下列舉了幾個常用的
Java類型 | Hadoop Writable類型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
array | ArrayWritable |
reduce部分
和map一樣,reduce也有自己自帶的類Reducer,同樣也是四個資料類型,分别表示輸入和輸出。輸入的是Map階段的處理結果,輸出就是Reduce最後的輸出
reducetask在調我們寫得reduce方法,reducetask應該收到了前一階段(map階段)中所有maptask輸出的資料中的一部分,是以reducetaks的輸入類型必須和maptask的輸出類型一樣
reducetask将這些收到的鍵值對資料拿來處理時,是這樣調用我們的reduce方法的:
先将自己收到的所有的鍵值對按照key來分組
将某一組鍵值對組中的第一個鍵值對中的key傳給reduce方法的key變量,把這一組鍵值對中所有的value用一個疊代器傳給reduce方法的變量value
main部分
main方法是該mapreduce程式運作的入口,其中用一個job類對象來管理程式運作時所需要的很多參數:
比如,指定用哪個元件作為資料讀取器、資料結果輸出器
指定用哪個類作為map階段的業務邏輯類,哪個類作為reduce階段的業務邏輯類
指定job程式的jar包所在路徑…
大緻架構
public static void main(String[] args)throws Exception {
//指定hdfs相關的參數
Configuration conf=new Configuration();//程式運作時參數
conf.set("fs.default.name","hdfs://localhost:9000");//hdfs的主節點
System.setProperty("HADOOP_USER_NAEM","hadoop");
//建立一個job任務
Job job=Job.getInstance(conf,"Merge and DR");//設定環境參數
//設定jar包所在路徑
job.setJarByClass(Main.class);//設定整個程式的類名
//指定mapper類和reducer類
job.setMapperClass(Merge_DR.MergeMapper.class);//添加Mapper類
job.setReducerClass(Merge_DR.MergeReducer.class);//添加Reducer類
///指定reducetask的輸出類型
job.setOutputKeyClass(Text.class);//設定輸出類型
job.setOutputValueClass(Text.class);//設定輸出類型
//指定該mapreduce程式資料的輸入和輸出路徑
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
//最後送出任務
System.exit(job.waitForCompletion(true)?0:1);
WordCount的分析
編寫Map處理邏輯
- Map輸入類型為<key,value>
- 期望的Map輸出類型為<單詞,出現次數>
- Map輸入類型最終确定為<Object,Text>
- Map輸出類型最終确定為<Text,IntWritable>
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
編寫Reduce處理邏輯
- 在Reduce處理資料之前,Map的結果首先通過Shuffle階段進行整理
- Reduce階段的任務:對輸入數字序列進行求和
- Reduce的輸入資料為<key,Iterable容器>
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
編寫main方法
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //設定環境參數
job.setJarByClass(WordCount.class); //設定整個程式的類名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper類
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer類
job.setOutputKeyClass(Text.class); //設定輸出類型
job.setOutputValueClass(IntWritable.class); //設定輸出類型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //設定環境參數
job.setJarByClass(WordCount.class); //設定整個程式的類名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper類
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer類
job.setOutputKeyClass(Text.class); //設定輸出類型
job.setOutputValueClass(IntWritable.class); //設定輸出類型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
完整代碼
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //設定環境參數
job.setJarByClass(WordCount.class);//設定整個程式的類名
job.setMapperClass(WordCount.TokenizerMapper.class);//添加Mapper類
job.setReducerClass(WordCount.IntSumReducer.class);//添加Reducer類
job.setOutputKeyClass(Text.class);//設定輸出類型
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
實驗題程式設計
了解了這些知識、掌握了大緻的架構和弄懂了這道程式設計執行個體就可以去應對實驗要求的内容
其中input的内容可以用指令行語句寫或者直接在程式中寫,記得每次運作新的程式要把input和output的内容删掉。
例如:
# 建立檔案wordfile1.txt和wordfile2.txt
cd ~
vim wordfile1.txt
vim wordfile2.txt
# 在運作程式之前,需要啟動Hadoop
cd /usr/local/hadoop
./sbin/start-dfs.sh
# 在啟動Hadoop之後,需要首先删除HDFS中對應的input和output目錄,保證檔案夾不存在,這樣確定後面程式運作不會出現問題
./bin/hdfs dfs –rm –r /input
./bin/hdfs dfs –rm –r /output
# 然後,再在HDFS中建立對應的input目錄,如“/input”目錄,表示在根目錄下建立一個input檔案夾
./bin/hdfs dfs –mkdir /input
# 在Linux本地檔案系統中建立兩個檔案wordfile1.txt和wordfile2.txt(假設這兩個檔案位于“/home/hadoop”目錄下),然後,把這兩個檔案上傳到HDFS中的“/user/hadoop/input”目錄下
./bin/hdfs dfs -put ~/wordfile1.txt /input
./bin/hdfs dfs -put ~/wordfile2.txt /input
1.程式設計實作檔案合并和去重操作
任務要求:
對于兩個輸入檔案,即檔案A和檔案B,請編寫MapReduce程式,對兩個檔案進行合并,并剔除其中重複的内容,得到一個新的輸出檔案C。下面是輸入檔案和輸出檔案的一個樣例供參考。
輸入檔案A的樣例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
輸入檔案B的樣例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根據輸入檔案A和B合并得到的輸出檔案C的樣例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
代碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge_DR {
public static class MergeMapper extends Mapper<Object,Text,Text,Text>{
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
context.write(value, new Text(""));
}
}
public static class MergeReducer extends Reducer<Text,Text,Text,Text>{
@Override
public void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程式運作時參數
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Merge and DR");//設定環境參數
job.setJarByClass(Merge_DR.class);//設定整個程式的類名
job.setMapperClass(Merge_DR.MergeMapper.class);//添加Mapper類
job.setCombinerClass(Merge_DR.MergeReducer.class);
job.setReducerClass(Merge_DR.MergeReducer.class);//添加Reducer類
job.setOutputKeyClass(Text.class);//設定輸出類型
job.setOutputValueClass(Text.class);//設定輸出類型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
}
2. 編寫程式實作對輸入檔案的排序
任務要求:
現在有多個輸入檔案,每個檔案中的每行内容均為一個整數。要求讀取所有檔案中的整數,進行升序排序後,輸出到一個新的檔案中,輸出的資料格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入檔案和輸出檔案的一個樣例供參考。
輸入檔案1的樣例如下:
33
37
12
40
輸入檔案2的樣例如下:
4
16
39
5
輸入檔案3的樣例如下:
1
45
25
根據輸入檔案1、2和3得到的輸出檔案如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45
代碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge_Sort {
public static class MergeMapper extends Mapper<Object,Text,IntWritable,IntWritable>{
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
context.write(new IntWritable(Integer.parseInt(value.toString())),new IntWritable(1));
}
}
public static class MergeReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private static IntWritable cnt=new IntWritable(1);
@Override
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
for(IntWritable num:values){
context.write(cnt, key);
cnt=new IntWritable(cnt.get()+1);
}
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程式運作時參數
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Merge and Sort");//設定環境參數
job.setJarByClass(Merge_Sort.class);//設定整個程式的類名
job.setMapperClass(Merge_Sort.MergeMapper.class);//添加Mapper類
job.setReducerClass(Merge_Sort.MergeReducer.class);//添加Reducer類
job.setOutputKeyClass(IntWritable.class);//設定輸出類型
job.setOutputValueClass(IntWritable.class);//設定輸出類型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
}
3. 對給定的表格進行資訊挖掘
任務要求:
下面給出一個child-parent的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格。
輸入檔案内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出檔案内容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
sql語句實作:
代碼:
import java.io.IOException;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Data_Mining {
public static class MergeMapper extends Mapper<Object,Text,Text,Text>{
private Text tkey=new Text();
private Text tvalue=new Text();
@Override
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
String[] tmp=value.toString().split("[\\s|\\t]+");
if(!tmp[0].equals("child")){
this.tkey.set(tmp[1]);
this.tvalue.set("l#,"+tmp[0]);
context.write(this.tkey, this.tvalue);
this.tkey.set(tmp[0]);
this.tvalue.set("r#,"+tmp[1]);
context.write(this.tkey, this.tvalue);
String[] txt=this.tvalue.toString().split(",");
}
}
}
public static class MergeReducer extends Reducer<Text,Text,Text,Text>{
private static boolean flag=true;
@Override
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
if(flag){
context.write(new Text("grandchild"), new Text("grandparent"));
flag=false;
}
LinkedList<String> listl=new LinkedList<String>();
LinkedList<String> listr=new LinkedList<String>();
for(Text text:values){
String[] tmp=text.toString().split(",");
if(tmp[0].equals("l#")){
listl.add(tmp[1]);
}
if(tmp[0].equals("r#")){
listr.add(tmp[1]);
}
}
for(String l:listl){
for(String r:listr){
context.write(new Text(l), new Text(r));
}
}
}
}
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();//程式運作時參數
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job=Job.getInstance(conf,"Data Mining");//設定環境參數
job.setJarByClass(Data_Mining.class);//設定整個程式的類名
job.setMapperClass(Data_Mining.MergeMapper.class);//添加Mapper類
job.setReducerClass(Data_Mining.MergeReducer.class);//添加Reducer類
job.setOutputKeyClass(Text.class);//設定輸出類型
job.setOutputValueClass(Text.class);//設定輸出類型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//設定輸入檔案
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//設定輸出檔案
System.exit(job.waitForCompletion(true)?0:1);
}
}