天天看点

Hadoop之MapReduce-倒排索引案例

一、问题描述

        统计每个单词在各个文件中出现的次数。

二、所给数据

       1. 输入数据:

         a.txt:                      b.txt

         hello tom            hello jerry

         hello jerry           hello tom

         hello kitty            hello world

         hello world

         hello tom

        2.输出数据

         hello   a.txt->5 b.txt->3

         jerry    a.txt->1 b.txt->1

         tom     a.txt->1 b.txt->2

         world  a.txt->1 b.txt->1

         kitty      b.txt->1

三、问题思路(在伪分布式模式下)

      1.map阶段

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.write("hello->a.txt",1);

       context.wtite("hello->b.txt",1)      

       context.wtite("hello->b.txt",1)        

       context.wtite("hello->b.txt",1)

      2.combiner阶段

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->a.txt",1>

       <"hello->b.txt",1>

       <"hello->b.txt",1>

       <"hello->b.txt",1>

      context.wtite("hello",'a.txt->5")       

      context.wtite("hello",'b.txt->3")

     3. Reducer阶段

       <"hello",{"a.txt->5","b.txt->3"}>

       context.write("hello","a.txt->5 b.txt->3");

四、代码实现

        类 InverseIndex

package edu.jianwei.hadoop.mr.ii;

import java.io.IOException;

public class InverseIndex {

	static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {
		private final Text k = new Text();
		private final Text v = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] words = line.split(" ");
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			String path = inputSplit.getPath().toString();
			for (String word : words) {
				k.set(word + "->" + path);
				v.set("1");
				context.write(k, v);
			}

		}
	}

	static class IndexCombiner extends Reducer<Text, Text, Text, Text> {

		private final Text key = new Text();
		private final Text value = new Text();

		@Override
		protected void reduce(Text k, Iterable<Text> v2s, Context context)
				throws IOException, InterruptedException {
			String line = k.toString();
			String[] wordAndpath = line.split("->");
			key.set(wordAndpath[0]);
			int counter = 0;
			for (Text v : v2s) {
				counter += Integer.parseInt(v.toString());
			}
			value.set(wordAndpath[1] + "->" + counter);
			context.write(key, value);
		}

	}

	static class IndexReducer extends Reducer<Text, Text, Text, Text> {
		private final Text v = new Text();

		@Override
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String value = "";
			for (Text v : values) {
				value += v.toString() + " ";
			}
			v.set(value);
			context.write(key, v);
		}
	}

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(InverseIndex.class);

		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		job.setCombinerClass(IndexCombiner.class);

		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}
           

五、代码测试

        1.代码运行

           hadoop jar  /root/ii.jar edu.jianwei.hadoop.mr.ii.InverseIndex  /inverseIndex  /inverseIndex/res

        2.输出结果

Hadoop之MapReduce-倒排索引案例