天天看點

Hadoop之MapReduce-反向索引案例

一、問題描述

        統計每個單詞在各個檔案中出現的次數。

二、所給資料

       1. 輸入資料:

         a.txt:                      b.txt

         hello tom            hello jerry

         hello jerry           hello tom

         hello kitty            hello world

         hello world

         hello tom

        2.輸出資料

         hello   a.txt->5 b.txt->3

         jerry    a.txt->1 b.txt->1

         tom     a.txt->1 b.txt->2

         world  a.txt->1 b.txt->1

         kitty      b.txt->1

三、問題思路(在僞分布式模式下)

      1.map階段

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.wtite("hello->b.txt",1)      

       context.wtite("hello->b.txt",1)        

       context.wtite("hello->b.txt",1)

      2.combiner階段

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->b.txt",1>

       <"hello->b.txt",1>

       <"hello->b.txt",1>

      context.wtite("hello",'a.txt->5")       

      context.wtite("hello",'b.txt->3")

     3. Reducer階段

       <"hello",{"a.txt->5","b.txt->3"}>

       context.write("hello","a.txt->5 b.txt->3");

四、代碼實作

        類 InverseIndex

package edu.jianwei.hadoop.mr.ii;

import java.io.IOException;

public class InverseIndex {

	static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
		private final Text k = new Text();
		private final Text v = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] words = line.split(" ");
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			String path = inputSplit.getPath().toString();
			for (String word : words) {
				k.set(word + "->" + path);
				v.set("1");
				context.write(k, v);
			}

		}
	}

	static class IndexCombiner extends Reducer<Text, Text, Text, Text> {

		private final Text key = new Text();
		private final Text value = new Text();

		@Override
		protected void reduce(Text k, Iterable<Text> v2s, Context context)
				throws IOException, InterruptedException {
			String line = k.toString();
			String[] wordAndpath = line.split("->");
			key.set(wordAndpath[0]);
			int counter = 0;
			for (Text v : v2s) {
				counter += Integer.parseInt(v.toString());
			}
			value.set(wordAndpath[1] + "->" + counter);
			context.write(key, value);
		}

	}

	static class IndexReducer extends Reducer<Text, Text, Text, Text> {
		private final Text v = new Text();

		@Override
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String value = "";
			for (Text v : values) {
				value += v.toString() + " ";
			}
			v.set(value);
			context.write(key, v);
		}
	}

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(InverseIndex.class);

		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		job.setCombinerClass(IndexCombiner.class);

		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}
           

五、代碼測試

        1.代碼運作

           hadoop jar  /root/ii.jar edu.jianwei.hadoop.mr.ii.InverseIndex  /inverseIndex  /inverseIndex/res

        2.輸出結果

Hadoop之MapReduce-反向索引案例