package flow.partitioner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import flow.pojo.Flow;
public class FlowPartitionerMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(conf);
job.setJarByClass(FlowPartitionerMR.class);
job.setMapperClass(FlowPartitionerMRMapper.class);
job.setMapOutputKeyClass(Flow.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FlowPartitionerMRReducer.class);
job.setOutputKeyClass(Flow.class);
job.setOutputValueClass(NullWritable.class);
/**
* 在MapReduce编程模型中,Partitioner的默认实现是HashPartitioner.如果
* 不能满足我们的需求,按照HashPartitioner的实现方式自定义一个Partitioner组件即可
*/
job.setPartitionerClass(ProvincePartitioner.class);
// job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(9);
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job,outputPath);
boolean isDone = job.waitForCompletion(true);
System.exit(isDone ? 0 : 1);
}
/**
* Mapper阶段的业务逻辑
*
*/
private static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{
Flow flow = new Flow();
/**
* 13480253104 2494800 2494800 4989600
*
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String phone = split[0];
long upFlow = Long.parseLong(split[1]);
long downFlow = Long.parseLong(split[2]);
long sumFlow = Long.parseLong(split[3]);
/**
* map方法每调用一次,那么就给flow对象中的对应属性重新设置值,达到减少JVM垃圾回收的目的
*/
flow.setPhone(phone);
flow.setUpFlow(upFlow);
flow.setDownFlow(downFlow);
flow.setSumFlow(sumFlow);
/**
* 当前这个做法,是让整个MapTask公用一个Flow对象。为什么可以这么用?
*
* 序列化的工作机制
* 每次通过context.write(key,value)其实是把key和value当中的属性值已经序列化了到其他的比如流或者内存或者磁盘文件
*
*/
context.write(flow, NullWritable.get());
}
}
/**
* Reducer阶段的业务逻辑
*/
private static class FlowPartitionerMRReducer extends Reducer<Flow, NullWritable, Flow, NullWritable>{
@Override
protected void reduce(Flow key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
/**
* 如果ReducerTask什么逻辑都不用做,仅仅只是作原样输出:
*
* 两钟实现方式:
*
* 1、在当前的reduce方法中,直接遍历原样输出
*/
for(NullWritable value : values){
context.write(key, value);
}
/**
* 2、直接不用定义Reducer,直接使用默认实现
*/
}
}
}
按照省分区:
package flow.partitioner;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import flow.pojo.Flow;
/**
* Partitioner的key-value的类型就是 Mapper组件的输出的key-value的类型
*
* Combiner的输入key-value的类型也是Mapper组件的输出key-value的类型
*
* 假如Combiner的执行在Partitioner之前。
* Partitioner的输入的key-vlaue的类型就应该是Combiner的输出key-value类型
*
* Partitoner的执行时在Combiner之前。
*
*/
public class ProvincePartitioner extends Partitioner<Flow, NullWritable> {
/**
* 作用:就是用来决定输入的参数key-value到底应该进入到哪个ReducerTask
*
* 当前自定义的Partitioner的逻辑:
* 按照手机归属地的不同,把所有用户的流量汇总信息输出到不同的结果文件中
* 如果要实现这个业务,必须依赖一张用户的手机号和归属地的字典表
*
* 这种类似的字典表: 存储在MySQL当中
*
* 咱们做模拟实现
* 假设: 134开头的所有用户都是北京的
* 135开头的所有用户都是上海的
* ...
*
*/
@Override
public int getPartition(Flow key, NullWritable value, int numPartitions) {
return getProvinceNumber(key);
}
private static int getProvinceNumber(Flow key){
String phone = key.getPhone();
String phonePrefix = phone.substring(0, 3);
if(phonePrefix.equals("134")){
return 0;
}else if (phonePrefix.equals("135")){
return 1;
}else if(phonePrefix.equals("136")){
return 2;
}else if(phonePrefix.equals("137")){
return 3;
}else if(phonePrefix.equals("138")){
return 4;
}else{
return 5;
}
}
}