上一篇我們學習了MapReduce的原理,今天我們使用代碼來加深對MapReduce原理的了解。
wordcount是Hadoop入門的經典例子,我們也不能免俗,也使用這個例子作為學習Hadoop的第一個程式。本文将介紹使用java和python編寫第一個MapReduce程式。
本文使用Idea2018開發工具開發第一個Hadoop程式。使用的程式設計語言是Java。
打開idea,建立一個工程,如下圖所示:
在彈出建立工程的界面選擇Java,接着選擇SDK,一般預設即可,點選“Next”按鈕,如下圖:
在彈出的選擇建立項目的模闆頁面,不做任何操作,直接點選“Next”按鈕。
輸入項目名稱,點選Finish,就完成了建立新項目的工作,我們的項目名稱為:WordCount。如下圖所示:
添加依賴jar包,和Eclipse一樣,要給項目添加相關依賴包,否則會出錯。
點選Idea的File菜單,然後點選“Project Structure”菜單,如下圖所示:
依次點選Modules和Dependencies,然後選擇“+”的符号,如下圖所示:
選擇hadoop的包,我用得是hadoop2.6.1。把下面的依賴包都加入到工程中,否則會出現某個類找不到的錯誤。
(1)”/usr/local/hadoop/share/hadoop/common”目錄下的hadoop-common-2.6.1.jar和haoop-nfs-2.6.1.jar;
(2)/usr/local/hadoop/share/hadoop/common/lib”目錄下的所有JAR包;
(3)“/usr/local/hadoop/share/hadoop/hdfs”目錄下的haoop-hdfs-2.6.1.jar和haoop-hdfs-nfs-2.7.1.jar;
(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目錄下的所有JAR包。
工程已經建立好,我們開始編寫Map類、Reduce類和運作MapReduce的入口類:
JAVA編寫MarReduce代碼
Map類如下:
1 import org.apache.hadoop.io.IntWritable;
2
3 import org.apache.hadoop.io.LongWritable;
4
5 import org.apache.hadoop.io.Text;
6
7 import org.apache.hadoop.mapreduce.Mapper;
8
9 import java.io.IOException;
10
11
12 public class WordcountMap extends Mapper<LongWritable,Text,Text,IntWritable> {
13 public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
14
15 String line = value.toString();//讀取一行資料
16
17 String str[] = line.split("");//因為英文字母是以“ ”為間隔的,是以使用“ ”分隔符将一行資料切成多個單詞并存在數組中
18
19 for(String s :str){//循環疊代字元串,将一個單詞變成<key,value>形式,及<"hello",1>
20 context.write(new Text(s),new IntWritable(1));
21 }
22 }
23 }
Reudce類:
1 import org.apache.hadoop.io.IntWritable;
2 import org.apache.hadoop.mapreduce.Reducer;
3 import org.apache.hadoop.io.Text;
4 import java.io.IOException;
5
6 public class WordcountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
7 public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
8 int count = 0;
9 for(IntWritable value: values) {
10 count++;
11 }
12 context.write(key,new IntWritable(count));
13 }
14 }
入口類 :
1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.fs.Path;
3 import org.apache.hadoop.mapreduce.Job;
4 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
5 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
6 import org.apache.hadoop.util.GenericOptionsParser;
7 import org.apache.hadoop.io.IntWritable;
8 import org.apache.hadoop.io.Text;
9
10 public class WordCount {
11
12 public static void main(String[] args)throws Exception{
13 Configuration conf = new Configuration();
14 //擷取運作時輸入的參數,一般是通過shell腳本檔案傳進來。
15 String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
16 if(otherArgs.length < 2){
17 System.err.println("必須輸入讀取檔案路徑和輸出路徑");
18 System.exit(2);
19 }
20 Job job = new Job();
21 job.setJarByClass(WordCount.class);
22 job.setJobName("wordcount app");
23
24 //設定讀取檔案的路徑,都是從HDFS中讀取。讀取檔案路徑從腳本檔案中傳進來
25 FileInputFormat.addInputPath(job,new Path(args[0]));
26 //設定mapreduce程式的輸出路徑,MapReduce的結果都是輸入到檔案中
27 FileOutputFormat.setOutputPath(job,new Path(args[1]));
28
29 //設定實作了map函數的類
30 job.setMapperClass(WordcountMap.class);
31 //設定實作了reduce函數的類
32 job.setReducerClass(WordcountReduce.class);
33
34 //設定reduce函數的key值
35 job.setOutputKeyClass(Text.class);
36 //設定reduce函數的value值
37 job.setOutputValueClass(IntWritable.class);
38 System.exit(job.waitForCompletion(true) ? 0 :1);
39 }
40 }
代碼寫好之後,開始jar包,按照下圖打包。點選“File”,然後點選“Project Structure”,彈出如下的界面,
依次點選"Artifacts" -> "+" -> "JAR" -> "From modules with dependencies",然後彈出一個選擇入口類的界面,選擇剛剛寫好的WordCount類,如下圖:
按照上面設定好之後,就開始打jar包,如下圖:
點選上圖的“Build”之後就會生成一個jar包。jar的位置看下圖,依次點選File->Project Structure->Artifacts就會看到如下的界面:
将打好包的wordcount.jar檔案上傳到裝有hadoop叢集的機器中,然後建立shell檔案,shell檔案内容如下,/usr/local/src/hadoop-2.6.1是hadoop叢集中hadoop的安裝位置,
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar wordcount.jar \ #執行jar檔案的指令以及jar檔案名,
2
3 hdfs://hadoop-master:8020/data/english.txt \ #輸入路徑
4
5 hdfs://hadoop-master:8020/wordcount_output #輸出路徑
執行shell檔案之後,會看到如下的資訊,
上圖中數字1表示輸入分片split的數量,數字2表示map和reduce的進度,數字3表示mapreduce執行成功,數字4表示啟動多少個map任務,數字5表示啟動多少個reduce任務。
自行成功後在hadoop叢集中的hdfs檔案系統中會看到一個wordcount_output的檔案夾。使用“hadoop fs -ls /”指令檢視:
在wordcount_output檔案夾中有兩個檔案,分别是_SUCCESS和part-r-00000,part-r-00000記錄着mapreduce的執行結果,使用hadoop fs -cat /wordcount_output/part-r-00000檢視part-r-00000的内容:
可以每個英文單詞出現的次數。
至此,借助idea 2018工具開發第一個使用java語言編寫的mapreduce程式已經成功執行。下面介紹使用python語言編寫的第一個mapreduce程式,相對于java,python編寫mapreduce會簡單很多,因為hadoop提供streaming,streaming是使用Unix标準流作為Hadoop和應用程式之間的接口,是以可以使用任何語言通過标準輸入輸出來寫MapReduce程式。
Python編寫MapReduce程式
看代碼:
實作了map函數的python程式,命名為map.py:
1 #!/usr/local/bin/python
2
3 import sys #導入sys包
4
5 for line in sys.stdin: #從标準輸入中讀取資料
6 ss = line.strip().split(\' \')#讀取每一行資料,strip()函數過濾掉空格換行的字元,split(\' \')分隔出每個額單詞并存放在數組ss中
7
8 for s in ss: #讀取數組ss中的每個單詞
9 if s.strip() != "":
10 print "%s\t%s" % (s, 1)#構造以單詞為key,1為value的鍵值對,并寫入到标準輸出中。
實作了reduce函數的python程式,命名為reduce.py:
1 import sys
2 cur_word = None
3 sum = 0
4 for line in sys.stdin:
5 ss = line.strip().split(\'\t\')#從标準輸入中讀取資料。
6 if len(ss) != 2:
7 continue
8 word,cnt = ss
9 if cur_word == None:
10 cur_word = word
11 #因為從map流轉到reduce的資料時按照key排好序的,cur_word記錄的是上一個單詞,word記 #錄的是目前讀取的單詞,如果兩個單詞一緻,則将sum+1,否則将word和sum值組成一個鍵值對,##寫入到标準輸出,同時sum指派為0,并且将word指派給cur_word變量。
12 if cur_word != word:
13 print \'\t\'.join([cur_word,str(sum)])
14 cur_word = word
15 sum = 0
16 sum += int(cnt)
17 print \'\t\'.join([cur_word,str(sum)])
map和reduce程式已經編寫完畢,下面編寫shell腳本檔案:
1 HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
2 STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar "
3
4 INPUT_FILE_PATH_1="/data/english.txt"#輸入路徑
5 OUTPUT_PATH="/wordcount_output"#輸出路徑
6 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH#每次執行時都删除輸出路徑,否則會出錯
7
8 $HADOOP_CMD jar $STREAM_JAR_PATH \
9 -input $INPUT_FILE_PATH_1 \#指定輸入路徑
10 -output $OUTPUT_PATH \#指定輸出路徑
11 -mapper "python map.py" \#指定要執行的map程式
12 -reducer "python reduce.py" \#指定要執行reduce程式
13 -file ./map.py \#指定map程式所在的位置
14 -file ./reduce.py#指定reduce程式所在的位置
到此Java和Python編寫第一個MapReduce程式已經完成。