天天看點

[Hadoop]MapReduce多輸出

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/SunnyYoona/article/details/53486203

FileOutputFormat及其子類産生的檔案放在輸出目錄下。每個reducer一個檔案并且檔案由分區号命名:part-r-00000,part-r-00001,等等。有時可能要對輸出的檔案名進行控制或讓每個reducer輸出多個檔案。MapReduce為此提供了MultipleOutputFormat類。

MultipleOutputFormat類可以将資料寫到多個檔案,這些檔案的名稱源于輸出的鍵和值或者任意字元串。這允許每個reducer(或者隻有map作業的mapper)建立多個檔案。采用name-r-nnnnn形式的檔案名用于map輸出,name-r-nnnnn形式的檔案名用于reduce輸出,其中name是由程式設定的任意名字,nnnnn是一個指名塊号的整數(從0開始)。塊号保證從不同塊(mapper或者reducer)寫的輸出在相同名字情況下不會沖突。

1. 重定義輸出檔案名

我們可以對輸出的檔案名進行控制。考慮這樣一個需求:按男女性别來區分度假訂單資料。這需要運作一個作業,作業的輸出是男女各一個檔案,此檔案包含男女性别的所有資料記錄。

這個需求可以使用MultipleOutputs來實作:

  1. package com.sjf.open.test;

  2. import java.io.IOException;

  3. import org.apache.commons.lang3.StringUtils;

  4. import org.apache.hadoop.conf.Configuration;

  5. import org.apache.hadoop.conf.Configured;

  6. import org.apache.hadoop.fs.Path;

  7. import org.apache.hadoop.io.LongWritable;

  8. import org.apache.hadoop.io.NullWritable;

  9. import org.apache.hadoop.io.Text;

  10. import org.apache.hadoop.io.compress.CompressionCodec;

  11. import org.apache.hadoop.io.compress.GzipCodec;

  12. import org.apache.hadoop.mapred.JobPriority;

  13. import org.apache.hadoop.mapreduce.Job;

  14. import org.apache.hadoop.mapreduce.Mapper;

  15. import org.apache.hadoop.mapreduce.Reducer;

  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  17. import org.apache.hadoop.mapreduce.lib.input.FileSplit;

  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  19. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

  20. import org.apache.hadoop.util.Tool;

  21. import org.apache.hadoop.util.ToolRunner;

  22. import com.sjf.open.utils.ConfigUtil;

  23. /**

  24. * Created by xiaosi on 16-11-7.

  25. */

  26. public class VacationOrderBySex extends Configured implements Tool {

  27. public static void main(String[] args) throws Exception {

  28. int status = ToolRunner.run(new VacationOrderBySex(), args);

  29. System.exit(status);

  30. }

  31. public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> {

  32. public String fInputPath = "";

  33. @Override

  34. protected void setup(Context context) throws IOException, InterruptedException {

  35. super.setup(context);

  36. fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString();

  37. }

  38. @Override

  39. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  40. String line = value.toString();

  41. if(fInputPath.contains("vacation_hot_country_order")){

  42. String[] params = line.split("\t");

  43. String sex = params[2];

  44. if(StringUtils.isBlank(sex)){

  45. return;

  46. }

  47. context.write(new Text(sex.toLowerCase()), value);

  48. }

  49. }

  50. }

  51. public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {

  52. private MultipleOutputs<NullWritable, Text> multipleOutputs;

  53. @Override

  54. protected void setup(Context context) throws IOException, InterruptedException {

  55. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);

  56. }

  57. @Override

  58. protected void reduce(Text key, Iterable<Text> values, Context context)

  59. throws IOException, InterruptedException {

  60. for (Text value : values) {

  61. multipleOutputs.write(NullWritable.get(), value, key.toString());

  62. }

  63. }

  64. @Override

  65. protected void cleanup(Context context) throws IOException, InterruptedException {

  66. multipleOutputs.close();

  67. }

  68. }

  69. @Override

  70. public int run(String[] args) throws Exception {

  71. if (args.length != 2) {

  72. System.err.println("./run <input> <output>");

  73. System.exit(1);

  74. }

  75. String inputPath = args[0];

  76. String outputPath = args[1];

  77. int numReduceTasks = 16;

  78. Configuration conf = this.getConf();

  79. conf.setBoolean("mapred.output.compress", true);

  80. conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);

  81. Job job = Job.getInstance(conf);

  82. job.setJobName("vacation_order_by_jifeng.si");

  83. job.setJarByClass(VacationOrderBySex.class);

  84. job.setMapperClass(VacationOrderBySexMapper.class);

  85. job.setReducerClass(VacationOrderBySexReducer.class);

  86. job.setMapOutputKeyClass(Text.class);

  87. job.setMapOutputValueClass(Text.class);

  88. job.setOutputKeyClass(NullWritable.class);

  89. job.setOutputValueClass(Text.class);

  90. FileInputFormat.setInputPaths(job, inputPath);

  91. FileOutputFormat.setOutputPath(job, new Path(outputPath));

  92. job.setNumReduceTasks(numReduceTasks);

  93. boolean success = job.waitForCompletion(true);

  94. return success ? 0 : 1;

  95. }

  96. }

在生成輸出的reduce中,在setup()方法中構造一個MultipleOutputs的執行個體并将它賦予一個執行個體變量。在reduce()方法中使用MultipleOutputs執行個體來寫輸出,而不是context。write()方法作用于鍵,值和名字。這裡使用的是性别作為名字,是以最後産生的輸出名稱的形式為sex-r-nnnnn:

  1. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS

  2. -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz

  3. -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz

  4. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz

  5. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz

  6. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz

  7. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz

  8. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz

  9. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz

  10. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz

  11. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz

  12. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz

我們可以看到在輸出檔案中不僅有我們想要的輸出檔案類型,還有part-r-nnnnn形式的檔案,但是檔案内沒有資訊,這是程式預設的輸出檔案。是以我們在指定輸出檔案名稱時(name-r-nnnnn),不要指定name為part,因為它已經被使用為預設值了。

2. 多目錄輸出

在MultipleOutputs的write()方法中指定的基本路徑相對于輸出路徑進行解釋,因為它可以包含檔案路徑分隔符(/),建立任意深度的子目錄。例如,我們改動上面的需求:按男女性别來區分度假訂單資料,不同性别資料位于不同子目錄(例如:sex=f/part-r-00000)。

  1. public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {

  2. private MultipleOutputs<NullWritable, Text> multipleOutputs;

  3. @Override

  4. protected void setup(Context context) throws IOException, InterruptedException {

  5. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);

  6. }

  7. @Override

  8. protected void reduce(Text key, Iterable<Text> values, Context context)

  9. throws IOException, InterruptedException {

  10. for (Text value : values) {

  11. String basePath = String.format("sex=%s/part", key.toString());

  12. multipleOutputs.write(NullWritable.get(), value, basePath);

  13. }

  14. }

  15. @Override

  16. protected void cleanup(Context context) throws IOException, InterruptedException {

  17. multipleOutputs.close();

  18. }

  19. }

後産生的輸出名稱的形式為sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:

  1. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS

  2. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz

  3. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz

  4. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz

  5. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz

  6. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz

  7. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz

  8. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz

  9. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz

  10. drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f

  11. drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m

3. 延遲輸出

FileOutputFormat的子類會産生輸出檔案(part-r-nnnnn),即使檔案是空的,也會産生。我們有時候不想要這些空的檔案,我們可以使用LazyOutputFormat進行處理。它是一個封裝輸出格式,可以指定分區第一條記錄輸出時才真正建立檔案。要使用它,用JobConf和相關輸出格式作為參數來調用setOutputFormatClass()方法即可:

  1. Configuration conf = this.getConf();

  2. Job job = Job.getInstance(conf);

  3. LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

再次檢查一下我們的輸出檔案(第一個例子):

  1. sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/

  2. Found 3 items

  3. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS

  4. -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz

  5. -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz



繼續閱讀