需求统计每一个号码产生的上行流行 和下行流量
电话 上行 下行
13795680000 10 20
13795680001 200 456
13795680002 125 1564
13795680003 1564 1465
13795680004 5511 2552
13795680005 1556 3565
13795680006 1564 6565
13795680007 5698 1236
13795680008 8456 4566
13795680009 50 100
13795680010 120 360
13795680000 10 20
13795680001 200 456
13795680002 125 1564
13795680003 1564 1465
13795680004 5511 2552
13795680005 1556 3565
13795680006 1564 6565
13795680007 5698 1236
13795680008 8456 4566
13795680009 50 100
13795680010 120 360
package flowcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
/**
* 1 .对读入的文件进行分割 分别获取 电话号码 ,上行流量 下行流量
* 2 .将上行流量和下行流量 封装到bean里
* 3 .以 <电话号码,bean>的形式 输出到reducer
*/
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String phoneNumber = fields[0];
long upflow = Long.parseLong(fields[1]);
long dflow = Long.parseLong(fields[2]);
context.write(new Text(phoneNumber), new FlowBean(upflow, dflow));
}
}
/**
* 1 .接受 mapper 阶段 的结果 以 <电话号码,bean> 的形式进行聚合,相同的电话号码 为一组。
* 2 .依次遍历每组的beans 取出 上行和下行流量进行累加
* 3 .将结果以<电话号码 ,bean.toString()>的形式进行输入 ,因为要将对象写入文件,会调用toString()方法,
* 在bean中重写了toString()方法 按照设定的格式输出。
*/
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
public void reduce(Text key, Iterable<FlowBean> beans, Context context)
throws IOException, InterruptedException {
long sumupflow = 0;
long sumdflow = 0;
for (FlowBean bean : beans) {
sumupflow += bean.getUpflow();
sumdflow += bean.getDflow();
}
System.out.println(sumupflow + " " + sumdflow);
FlowBean resultBean = new FlowBean(sumupflow, sumdflow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "flow");
// 设置job所需的jar包的本地路径
// job.setJar("/home.hadoop1/wc.jar");
job.setJarByClass(FlowCount.class);
// 指定业务job要使用的 mapper/reduce 业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 指定reduce 完成后最终的数据KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 指定输入文件的hdfs的所在目录
FileInputFormat.addInputPath(job, new Path(args[0]));
// 指定输出结果的hdfs所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
13795680000 upflow: 20 dflow: 40 sumflow 60
13795680001 upflow: 400 dflow: 912 sumflow 1312
13795680002 upflow: 250 dflow: 3128 sumflow 3378
13795680003 upflow: 3128 dflow: 2930 sumflow 6058
13795680004 upflow: 11022 dflow: 5104 sumflow 16126
13795680005 upflow: 3112 dflow: 7130 sumflow 10242
13795680006 upflow: 3128 dflow: 13130 sumflow 16258
13795680007 upflow: 11396 dflow: 2472 sumflow 13868
13795680008 upflow: 16912 dflow: 9132 sumflow 26044
13795680009 upflow: 100 dflow: 200 sumflow 300
13795680010 upflow: 240 dflow: 720 sumflow 960