天天看点

mapreduce 多种输入

1.多路径输入

1)FileInputFormat.addInputPath 多次调用加载不同路径

FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"));

FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path2"));

2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开

FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");

2.多种输入

MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper

MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);

MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

例子:

package example;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

 * 多类型文件输入

 * @author lijl

 *

 */

public class MultiTypeFileInputMR {

static class MultiTypeFileInput1Mapper extends Mapper<LongWritable, Text, Text, Text>{

public void map(LongWritable key,Text value,Context context){

try {

String[] str = value.toString().split("\\|");

context.write(new Text(str[0]), new Text(str[1]));

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

}

static class MultiTypeFileInput3Mapper extends Mapper<LongWritable, Text, Text, Text>{

String[] str = value.toString().split("");

static class MultiTypeFileInputReducer extends Reducer<Text, Text, Text, Text>{

public void reduce(Text key,Iterable<Text> values,Context context){

for(Text value:values){

context.write(key,value);

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

conf.set("mapred.textoutputformat.separator", ",");

Job job = new Job(conf,"MultiPathFileInput");

job.setJarByClass(MultiTypeFileInputMR.class);

FileOutputFormat.setOutputPath(job, new Path("hdfs://RS5-112:9000/cs/path6"));

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setReducerClass(MultiTypeFileInputReducer.class);

job.setNumReduceTasks(1);

System.exit(job.waitForCompletion(true)?0:1);

继续阅读