天天看點

java 編寫MR程式之 - 流量統計

需求統計每一個号碼産生的上行流行 和下行流量

電話 上行 下行

13795680000 10 20 

13795680001 200 456 

13795680002 125 1564 

13795680003 1564 1465

13795680004 5511 2552 

13795680005 1556 3565 

13795680006 1564 6565 

13795680007 5698 1236 

13795680008 8456 4566 

13795680009 50 100 

13795680010 120 360 

13795680000 10 20 

13795680001 200 456 

13795680002 125 1564 

13795680003 1564 1465

13795680004 5511 2552 

13795680005 1556 3565 

13795680006 1564 6565 

13795680007 5698 1236 

13795680008 8456 4566 

13795680009 50 100 

13795680010 120 360 

package flowcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
	/**
	 * 1 .對讀入的檔案進行分割 分别擷取 電話号碼 ,上行流量 下行流量
	 * 2 .将上行流量和下行流量 封裝到bean裡
	 * 3 .以 <電話号碼,bean>的形式 輸出到reducer
	 */
	public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(" ");
			String phoneNumber = fields[0];
			long upflow = Long.parseLong(fields[1]);
			long dflow = Long.parseLong(fields[2]);
			context.write(new Text(phoneNumber), new FlowBean(upflow, dflow));

		}

	}

	/**
	 * 1 .接受 mapper 階段 的結果 以 <電話号碼,bean> 的形式進行聚合,相同的電話号碼 為一組。 
	 * 2 .依次周遊每組的beans 取出 上行和下行流量進行累加 
	 * 3 .将結果以<電話号碼 ,bean.toString()>的形式進行輸入 ,因為要将對象寫入檔案,會調用toString()方法,
	 *    在bean中重寫了toString()方法 按照設定的格式輸出。
	 */
	public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		public void reduce(Text key, Iterable<FlowBean> beans, Context context)
				throws IOException, InterruptedException {
			long sumupflow = 0;
			long sumdflow = 0;
			for (FlowBean bean : beans) {
				sumupflow += bean.getUpflow();
				sumdflow += bean.getDflow();
			}
			System.out.println(sumupflow + " " + sumdflow);
			FlowBean resultBean = new FlowBean(sumupflow, sumdflow);
			context.write(key, resultBean);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "flow");
		// 設定job所需的jar包的本地路徑
		// job.setJar("/home.hadoop1/wc.jar");
		job.setJarByClass(FlowCount.class);

		// 指定業務job要使用的 mapper/reduce 業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 指定reduce 完成後最終的資料KV類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 指定輸入檔案的hdfs的所在目錄
		FileInputFormat.addInputPath(job, new Path(args[0]));

		// 指定輸出結果的hdfs所在目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
           

13795680000    upflow: 20  dflow: 40   sumflow 60

13795680001    upflow: 400  dflow: 912   sumflow 1312

13795680002    upflow: 250  dflow: 3128   sumflow 3378

13795680003    upflow: 3128  dflow: 2930   sumflow 6058

13795680004    upflow: 11022  dflow: 5104   sumflow 16126

13795680005    upflow: 3112  dflow: 7130   sumflow 10242

13795680006    upflow: 3128  dflow: 13130   sumflow 16258

13795680007    upflow: 11396  dflow: 2472   sumflow 13868

13795680008    upflow: 16912  dflow: 9132   sumflow 26044

13795680009    upflow: 100  dflow: 200   sumflow 300

13795680010    upflow: 240  dflow: 720   sumflow 960