OutPutFormat資料輸出
實作接口如下:
自定義OutputFormat使用場景以及步驟
實操:
目的:過濾檔案,将包含caocao的行輸入到caocao.txt中,其他的資料輸出到other.txt中
輸入資料:
caocao shi wei wu di
liubei shi shu zhao lie di
liubei shi shu guo de
caocao shi wei guo de
caozhi shi caocao de er zi
Mapper如下
public class DefinedOutputFormatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Text k = new Text();
String line = "\r\n";
@Override
protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String keyStr = value.toString();
// 這裡不加\r\n的話,不會自動換行
k.set(keyStr + line);
context.write(k, NullWritable.get());
}
}
Reducer如下:
public class DefinedOutputFormatReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 防止有重複的資料
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
自定義OutputFormat實作類如下
public class DefinedOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException {
return new DefinedOutputRecordWriter(context);
}
}
自定義RecordWriter如下
public class DefinedOutputRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream fosCaocao;
FSDataOutputStream fosOther;
public DefinedOutputRecordWriter (TaskAttemptContext context) {
try {
// 1、擷取檔案系統
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
// 2、建立輸出到caocao.txt的輸出流
fosCaocao = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/caocao.txt"));
// 3、建立輸出到other.txt的輸出流
fosOther = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/other.txt"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write (Text text, NullWritable nullWritable) throws IOException, InterruptedException {
// 判斷key中是否有caocao,有則寫入caocao.txt,否則寫入other.txt
if (text.toString().contains("caocao")) {
fosCaocao.write(text.toString().getBytes());
} else {
fosOther.write(text.toString().getBytes());
}
}
@Override
public void close (TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(fosCaocao);
IOUtils.closeStream(fosOther);
}
Driver加入
// 設定OutputFormat
job.setOutputFormatClass(DefinedOutputFormat.class);
運作結果如下: