需求統計每一個号碼産生的上行流行 和下行流量
電話 上行 下行
13795680000 10 20
13795680001 200 456
13795680002 125 1564
13795680003 1564 1465
13795680004 5511 2552
13795680005 1556 3565
13795680006 1564 6565
13795680007 5698 1236
13795680008 8456 4566
13795680009 50 100
13795680010 120 360
13795680000 10 20
13795680001 200 456
13795680002 125 1564
13795680003 1564 1465
13795680004 5511 2552
13795680005 1556 3565
13795680006 1564 6565
13795680007 5698 1236
13795680008 8456 4566
13795680009 50 100
13795680010 120 360
package flowcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
/**
* 1 .對讀入的檔案進行分割 分别擷取 電話号碼 ,上行流量 下行流量
* 2 .将上行流量和下行流量 封裝到bean裡
* 3 .以 <電話号碼,bean>的形式 輸出到reducer
*/
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String phoneNumber = fields[0];
long upflow = Long.parseLong(fields[1]);
long dflow = Long.parseLong(fields[2]);
context.write(new Text(phoneNumber), new FlowBean(upflow, dflow));
}
}
/**
* 1 .接受 mapper 階段 的結果 以 <電話号碼,bean> 的形式進行聚合,相同的電話号碼 為一組。
* 2 .依次周遊每組的beans 取出 上行和下行流量進行累加
* 3 .将結果以<電話号碼 ,bean.toString()>的形式進行輸入 ,因為要将對象寫入檔案,會調用toString()方法,
* 在bean中重寫了toString()方法 按照設定的格式輸出。
*/
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
public void reduce(Text key, Iterable<FlowBean> beans, Context context)
throws IOException, InterruptedException {
long sumupflow = 0;
long sumdflow = 0;
for (FlowBean bean : beans) {
sumupflow += bean.getUpflow();
sumdflow += bean.getDflow();
}
System.out.println(sumupflow + " " + sumdflow);
FlowBean resultBean = new FlowBean(sumupflow, sumdflow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "flow");
// 設定job所需的jar包的本地路徑
// job.setJar("/home.hadoop1/wc.jar");
job.setJarByClass(FlowCount.class);
// 指定業務job要使用的 mapper/reduce 業務類
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 指定reduce 完成後最終的資料KV類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 指定輸入檔案的hdfs的所在目錄
FileInputFormat.addInputPath(job, new Path(args[0]));
// 指定輸出結果的hdfs所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
13795680000 upflow: 20 dflow: 40 sumflow 60
13795680001 upflow: 400 dflow: 912 sumflow 1312
13795680002 upflow: 250 dflow: 3128 sumflow 3378
13795680003 upflow: 3128 dflow: 2930 sumflow 6058
13795680004 upflow: 11022 dflow: 5104 sumflow 16126
13795680005 upflow: 3112 dflow: 7130 sumflow 10242
13795680006 upflow: 3128 dflow: 13130 sumflow 16258
13795680007 upflow: 11396 dflow: 2472 sumflow 13868
13795680008 upflow: 16912 dflow: 9132 sumflow 26044
13795680009 upflow: 100 dflow: 200 sumflow 300
13795680010 upflow: 240 dflow: 720 sumflow 960