天天看点

MapReduce 中的 Partitioner

package flow.partitioner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import flow.pojo.Flow;

public class FlowPartitionerMR {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowPartitionerMR.class);
		
		job.setMapperClass(FlowPartitionerMRMapper.class);
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setReducerClass(FlowPartitionerMRReducer.class);
		job.setOutputKeyClass(Flow.class);
		job.setOutputValueClass(NullWritable.class);
		
		/**
		 * 在MapReduce编程模型中,Partitioner的默认实现是HashPartitioner.如果
		 * 不能满足我们的需求,按照HashPartitioner的实现方式自定义一个Partitioner组件即可
		 */
		job.setPartitionerClass(ProvincePartitioner.class);
	
//		job.setPartitionerClass(HashPartitioner.class); 
		
		
		job.setNumReduceTasks(9);
		
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job,outputPath);
		
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);
		
	}
	
	/**
	 * Mapper阶段的业务逻辑
	 *
	 */
	private static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{
		Flow flow = new Flow();
		/**
		 * 13480253104	2494800	2494800	4989600
		 * 
		 */

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			String phone = split[0];
			long upFlow = Long.parseLong(split[1]);
			long downFlow = Long.parseLong(split[2]);
			long sumFlow = Long.parseLong(split[3]);
			
			/**
			 * map方法每调用一次,那么就给flow对象中的对应属性重新设置值,达到减少JVM垃圾回收的目的
			 */
			flow.setPhone(phone);
			flow.setUpFlow(upFlow);
			flow.setDownFlow(downFlow);
			flow.setSumFlow(sumFlow);
		
			/**
			 * 当前这个做法,是让整个MapTask公用一个Flow对象。为什么可以这么用?
			 * 
			 * 序列化的工作机制
			 * 每次通过context.write(key,value)其实是把key和value当中的属性值已经序列化了到其他的比如流或者内存或者磁盘文件
			 * 
			 */
			context.write(flow, NullWritable.get());
			
		}
		
	}
	
	/**
	 * Reducer阶段的业务逻辑
	 */
	private static class FlowPartitionerMRReducer extends Reducer<Flow, NullWritable, Flow, NullWritable>{

		@Override
		protected void reduce(Flow key, Iterable<NullWritable> values, Context context) 
				throws IOException, InterruptedException {

			/**
			 * 如果ReducerTask什么逻辑都不用做,仅仅只是作原样输出:
			 * 
			 * 两钟实现方式:
			 * 
			 * 1、在当前的reduce方法中,直接遍历原样输出
			 */
			for(NullWritable value : values){
				context.write(key, value);
			}
			
			/**
			 * 2、直接不用定义Reducer,直接使用默认实现
			 */

		}
		
		
		
	}
	
	

}
           

按照省分区:

package flow.partitioner;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import flow.pojo.Flow;
/**
 * Partitioner的key-value的类型就是 Mapper组件的输出的key-value的类型
 * 
 * Combiner的输入key-value的类型也是Mapper组件的输出key-value的类型
 * 
 * 假如Combiner的执行在Partitioner之前。
 * Partitioner的输入的key-vlaue的类型就应该是Combiner的输出key-value类型
 * 
 * Partitoner的执行时在Combiner之前。
 *
 */

public class ProvincePartitioner  extends Partitioner<Flow, NullWritable> {

	/**
	 * 作用:就是用来决定输入的参数key-value到底应该进入到哪个ReducerTask
	 * 
	 * 当前自定义的Partitioner的逻辑:
	 * 	按照手机归属地的不同,把所有用户的流量汇总信息输出到不同的结果文件中
	 * 	如果要实现这个业务,必须依赖一张用户的手机号和归属地的字典表
	 * 
	 * 	这种类似的字典表: 存储在MySQL当中
	 * 	
	 * 	咱们做模拟实现
	 * 	假设: 134开头的所有用户都是北京的
	 * 		 135开头的所有用户都是上海的
	 * 		 ...
	 *
	 */
	@Override
	public int getPartition(Flow key, NullWritable value, int numPartitions) {
		
		return getProvinceNumber(key);
	}
	
	private static int getProvinceNumber(Flow key){
		String phone = key.getPhone();
		String phonePrefix = phone.substring(0, 3);
		
		if(phonePrefix.equals("134")){
			return 0;
		}else if (phonePrefix.equals("135")){
			return 1;
		}else if(phonePrefix.equals("136")){
			return 2;
		}else if(phonePrefix.equals("137")){
			return 3;
		}else if(phonePrefix.equals("138")){
			return 4;
		}else{
			return 5;
		}
	}
	
	

}
           

继续阅读