要統計的檔案的檔案名為hello
hello中的内容如下
hello you
hello me
通過MapReduce程式統計出檔案中的各個單詞出現了幾次.(兩個單詞之間通過tab鍵進行的分割)
1 import java.io.IOException;
2
3 import mapreduce.WordCountApp.WordCountMapper.WordCountReducer;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
15 /**
16 * 以文本
17 * hello you
18 * hello me
19 * 為例子.
20 * map方法調用了兩次,因為有兩行
21 * k2 v2 鍵值對的數量有幾個?
22 * 有4個.有四個單詞.
23 *
24 * 會産生幾個分組?
25 * 産生3個分組.
26 * 有3個不同的單詞.
27 *
28 */
29 public class WordCountApp {
30 public static void main(String[] args) throws Exception {
31 //程式在這裡運作,要有驅動.
32 Configuration conf = new Configuration();
33 Job job = Job.getInstance(conf,WordCountApp.class.getSimpleName());
34
35 //我們運作此程式通過運作jar包來執行.一定要有這句話.
36 job.setJarByClass(WordCountApp.class);
37
38 FileInputFormat.setInputPaths(job,args[0]);
39
40 job.setMapperClass(WordCountMapper.class);//設定Map類
41 job.setMapOutputKeyClass(Text.class);//設定Map的key
42 job.setMapOutputValueClass(LongWritable.class);//設定Map的value
43 job.setReducerClass(WordCountReducer.class);//設定Reduce的類
44 job.setOutputKeyClass(Text.class);//設定Reduce的key Reduce這個地方隻有輸出的參數可以設定. 方法名字也沒有Reduce關鍵字差別于Map
45 job.setOutputValueClass(LongWritable.class);//設定Reduce的value.
46
47 FileOutputFormat.setOutputPath(job, new Path(args[1]));
48 job.waitForCompletion(true);//表示結束了才退出,不結束不退出
49 }
50 /**
51 * 4個泛型的意識
52 * 第一個是LongWritable,固定就是這個類型,表示每一行單詞的起始位置(機關是位元組)
53 * 第二個是Text,表示每一行的文本内容.
54 * 第三個是Text,表示單詞
55 * 第四個是LongWritable,表示單詞的出現次數
56 */
57 public static class WordCountMapper extends Mapper<LongWritable, Text, Text ,LongWritable>{
58 Text k2 = new Text();
59 LongWritable v2 = new LongWritable();
60 //增加一個計數器,這個Map調用幾次就輸出對應的次數.
61 int counter = 0;
62
63
64 /**
65 * key和value表示輸入的資訊
66 * 每一行文本調用一次map函數
67 */
68 @Override
69 protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)
70 throws IOException, InterruptedException {
71 counter = counter + 1;
72 System.out.println("mapper 調用的次數:" + counter);
73 //這個map方法中的Mapper的各個泛型和上面的意識是一樣的,分别代表的是k1,v1,k2,v2
74 String line = value.toString();
75 System.out.println(String.format("<k1,v1>的值<"+key.get()+","+line+">"));
76 String[] splited = line.split("\t");
77 for (String word : splited) {
78 k2.set(word);
79 v2.set(1);
80 System.out.println(String.format("<k2,v2>的值<"+k2.toString()+","+v2.get()+">"));
81 context.write(k2, v2);//通過context對象寫出去.
82 }
83 }
84 /**
85 * 這個地方的四個泛型的意思
86 * 前兩個泛型是對應的Map方法的後兩個泛型.
87 * Map的輸出對應的是Reduce的輸入.
88 * 第一個Text是單詞
89 * 第二個LongWritable是單詞對應的次數
90 * 我們想輸出的也是單詞 和 次數
91 * 是以第三個和第四個的類型和第一和第二個的一樣
92 *
93 * 分組指的是把相同key2的value2放到一個集合中
94 *
95 */
96 public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
97 LongWritable v3 = new LongWritable();
98 //增加一個計數器,這個Reduce調用幾次就輸出對應的次數.
99 int counter = 0;
100
101 /**
102 * 每一個分組調用一次reduce函數
103 * 過來的k2 分别是hello you me
104 *
105 */
106 @Override
107 protected void reduce(Text key2, Iterable<LongWritable> value2Iterable,Reducer<Text, LongWritable, Text,
108 LongWritable>.Context context)
109 throws IOException, InterruptedException {
110 counter = counter + 1;
111 System.out.println("reducer 調用的次數:" + counter);
112 //第一個參數是單詞,第二個是可疊代的集合. 為什麼上面的LongWritable類型的對象value2變成了一個可以疊代的結合參數?
113 //因為分組指的是把相同key2的value2放到一個集合中
114 long sum = 0L;
115 for (LongWritable value2 : value2Iterable) {
116 System.out.println(String.format("<k2,v2>的值<"+key2.toString()+","+value2.toString()+">"));
117 sum += value2.get(); //這個value2是LongWritable類型的,不能進行+= 操作,要用get()得到其對應的java基本類型.
118 //sum表示單詞k2 在整個文本中的出現次數.
119 }
120 v3.set(sum);
121 context.write(key2, v3);
122 System.out.println(String.format("<k3,v3>的值<"+key2.toString()+","+v3.get()+">"));
123 }
124 }
125 }
126 }
通過運作Yarn叢集檢視Map日志得到的輸出結果:
檢視Reduce日志産看到的輸出結果:
//============================================================================
以下程式是之前的寫的:注釋更加詳細:
1 /*
2 * 一個hello檔案内容如下:
3 * hello you
4 * hello me
5 */
6 import java.io.IOException;
7
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17
18 public class WordCountApp {
19 public static void main(String[] args) throws Exception {
20 // 在main方法寫驅動程式,把Map函數和Reduce函數組織在一起.
21 // 搞一個對象把Map對象和Reduce對象都放在這個對象中,我們把這個對象稱作Job
22 // 兩個形參,一個是Configuration對象,一個是Job的名稱,這樣獲得了一個Job對象;
23 Job job = Job.getInstance(new Configuration(),
24 WordCountApp.class.getSimpleName());
25 // 對這個job進行設定
26 job.setJarByClass(WordCountApp.class);// 通過這個設定可以讓架構識别你寫的代碼
27
28 job.setMapperClass(MyMapper.class);// 把自定義的Map類放到job中
29 job.setMapOutputKeyClass(Text.class);// 定義Map的key的輸出類型,Map的輸出是<hello,2>
30 job.setMapOutputValueClass(LongWritable.class);// 定義Map的value的輸出類型
31
32 job.setReducerClass(MyReducer.class);// 把自定義的Reducer類放到job中
33 job.setOutputKeyClass(Text.class);// 因為Reduce的輸出是最終的資料,Reduce的輸出是<hello,2>
34 // 是以這個方法名中沒有像Map對應的放發一樣帶有Reduce,直接就是setOutputKeyClass
35 job.setOutputValueClass(LongWritable.class);// 定義reduce的value輸出
36
37 FileInputFormat.setInputPaths(job, args[0]);// 輸入指定:傳入一個job位址.
38 // 這個args[0] 就是新位址,"hdfs://192.168.0.170/hello"
39 FileOutputFormat.setOutputPath(job, new Path(args[1]));
40 // 輸出指定
41 // 指定輸入和輸出路徑可以通過在這裡寫死的方式,也可以通過main函數參數的形式
42 // 分别是args[0]和args[1]
43
44 // 把job上傳到yarn平台上.
45 job.waitForCompletion(true);
46 }
47
48 /*
49 * 對于<k1,v1>而言,每一行産生一個<k1,v1>對,<k1,v1>表示<行的起始位置,行的文本内容>
50 * 就本例而言map函數總共調用兩次,因為總共隻有兩行.
51 * 正對要統計的文本内容可以知道總共兩行,總共會調用兩次Map函數對應産生的<k1,v1>分别是<0,hello you>
52 * 和第二個<k1,v1>是<10,hello me>
53 */
54 private static class MyMapper extends
55 Mapper<LongWritable, Text, Text, LongWritable> {
56 // 這個Mapper的泛型參數是<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 分别對應的是k1,v1,k2,v2
57 // 我們如下講的k1,v1的類型是固定的.
58 // 就本例而言,map函數會被調用2次,因為總共文本檔案就隻有兩行.
59
60 //要定義輸出的k2和v2.本案例中可以分析出<k2,v2>是對文本内容的統計<hello,1><hello,1><you,1><me,1>
61 //而且<k2,v2>的内容是和<k3,v3>中的内容是一樣的.
62 Text k2 = new Text();
63 LongWritable v2 = new LongWritable();
64 //重寫父類Mapper中的map方法
65 @Override
66 protected void map(LongWritable key, Text value,
67 Mapper<LongWritable, Text, Text, LongWritable>.Context context)
68 throws IOException, InterruptedException {
69 //通過代碼或者案例分析就可以知道k1其實沒有什麼用出的.
70 String line = value.toString();
71 String[] splited = line.split("\t");//根據制表分隔符機進行拆分.hello和me,you之間是一個制表分隔符.
72 for (String word : splited) {
73 k2.set(word);
74 v2.set(1);
75 context.write(k2, v2);
76 //用context把k2,v2寫出去,架構會寫,不用我們去管.
77 }
78 }
79 }
80
81 private static class MyReducer extends
82 Reducer<Text, LongWritable, Text, LongWritable> {
83 //這個例子中的<k2,v2>和<k3,v3>中的k是一樣的,是以這裡,k2當做k3了.
84 LongWritable v3 = new LongWritable();
85 @Override
86 protected void reduce(Text k2, Iterable<LongWritable> v2s,
87 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
88 throws IOException, InterruptedException {
89 //Reduce是對上面Map中的結果進行彙總的.
90 //上面拆分出來的<k2,v2>是<hello,1><hello,1><you,1><me,1>Reduce方法中就要對其進行彙總.
91 long sum = 0L;
92 for(LongWritable v2:v2s){
93 sum = sum +v2.get();//sum是long類型,v2是LongWritable類型
94 //LongWritable類型轉換成long類型用get()方法.
95 //sum的值表示單詞在整個檔案中出現的中次數.
96 }
97 v3.set(sum);
98 context.write(k2,v3);
99 }
100 }
101
102 }
//===============================================================================
檢視日志的時候,代碼中的System.out.println()對于Java程式輸出到控制台,但是這個地方是把Java類打成Jar包,
放到叢集中去通過指令執行的.
輸出通過日志檢視的.
上面對應的Log Type:stdout
stdout:stdout(Standardoutput)标準輸出
作者:SummerChill |