天天看点

java 编写MR程序之 - 流量统计

需求统计每一个号码产生的上行流行 和下行流量

电话 上行 下行

13795680000 10 20 

13795680001 200 456 

13795680002 125 1564 

13795680003 1564 1465

13795680004 5511 2552 

13795680005 1556 3565 

13795680006 1564 6565 

13795680007 5698 1236 

13795680008 8456 4566 

13795680009 50 100 

13795680010 120 360 

13795680000 10 20 

13795680001 200 456 

13795680002 125 1564 

13795680003 1564 1465

13795680004 5511 2552 

13795680005 1556 3565 

13795680006 1564 6565 

13795680007 5698 1236 

13795680008 8456 4566 

13795680009 50 100 

13795680010 120 360 

package flowcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
	/**
	 * 1 .对读入的文件进行分割 分别获取 电话号码 ,上行流量 下行流量
	 * 2 .将上行流量和下行流量 封装到bean里
	 * 3 .以 <电话号码,bean>的形式 输出到reducer
	 */
	public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(" ");
			String phoneNumber = fields[0];
			long upflow = Long.parseLong(fields[1]);
			long dflow = Long.parseLong(fields[2]);
			context.write(new Text(phoneNumber), new FlowBean(upflow, dflow));

		}

	}

	/**
	 * 1 .接受 mapper 阶段 的结果 以 <电话号码,bean> 的形式进行聚合,相同的电话号码 为一组。 
	 * 2 .依次遍历每组的beans 取出 上行和下行流量进行累加 
	 * 3 .将结果以<电话号码 ,bean.toString()>的形式进行输入 ,因为要将对象写入文件,会调用toString()方法,
	 *    在bean中重写了toString()方法 按照设定的格式输出。
	 */
	public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		public void reduce(Text key, Iterable<FlowBean> beans, Context context)
				throws IOException, InterruptedException {
			long sumupflow = 0;
			long sumdflow = 0;
			for (FlowBean bean : beans) {
				sumupflow += bean.getUpflow();
				sumdflow += bean.getDflow();
			}
			System.out.println(sumupflow + " " + sumdflow);
			FlowBean resultBean = new FlowBean(sumupflow, sumdflow);
			context.write(key, resultBean);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "flow");
		// 设置job所需的jar包的本地路径
		// job.setJar("/home.hadoop1/wc.jar");
		job.setJarByClass(FlowCount.class);

		// 指定业务job要使用的 mapper/reduce 业务类
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 指定reduce 完成后最终的数据KV类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 指定输入文件的hdfs的所在目录
		FileInputFormat.addInputPath(job, new Path(args[0]));

		// 指定输出结果的hdfs所在目录
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
           

13795680000    upflow: 20  dflow: 40   sumflow 60

13795680001    upflow: 400  dflow: 912   sumflow 1312

13795680002    upflow: 250  dflow: 3128   sumflow 3378

13795680003    upflow: 3128  dflow: 2930   sumflow 6058

13795680004    upflow: 11022  dflow: 5104   sumflow 16126

13795680005    upflow: 3112  dflow: 7130   sumflow 10242

13795680006    upflow: 3128  dflow: 13130   sumflow 16258

13795680007    upflow: 11396  dflow: 2472   sumflow 13868

13795680008    upflow: 16912  dflow: 9132   sumflow 26044

13795680009    upflow: 100  dflow: 200   sumflow 300

13795680010    upflow: 240  dflow: 720   sumflow 960