天天看點

Hadoop MapReduce(FlowCount) Java程式設計

編寫PhoneFlow程式,計算手機上行流量、下行流量以及總流量,資料如下:

 13685295623 122  201 

 13985295600 102  11 

 13885295622 22   101 

 13785295633 120  20 

1、FlowMapper:

package com.hadoop.flow;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

 * 資料格式:

 * 13685295623 122  201 

 * 13985295600 102  11 

 * 13885295622 22   101 

 * 13785295633 120  20 

 */

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}

2、FlowReducer:

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

3、FlowRunner 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

4、FlowBean :

import java.io.DataInput;

import java.io.DataOutput;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

public String getPhoneNB() {

return phoneNB;

public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

public long getUp_flow() {

return up_flow;

public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

public long getD_flow() {

return d_flow;

public void setD_flow(long d_flow) {

this.d_flow = d_flow;

public long getS_flow() {

return s_flow;

public void setS_flow(long s_flow) {

this.s_flow = s_flow;

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

本文轉自lzf0530377451CTO部落格,原文連結:http://blog.51cto.com/8757576/1839299 ,如需轉載請自行聯系原作者