天天看點

探索Hadoop outputFormat

Hadoop常常被用作大型資料處理生态系統中的一部分。它的優勢在于能夠批量地處理大量資料,并将結果以最好的方式與其他系統相內建。從高層次角度來看,整個過程就是Hadoop接收輸入檔案、使用自定義轉換(Map-Reduce步驟)獲得内容流,以及将輸出檔案的結果寫回磁盤。上個月InfoQ展示了怎樣在第一個步驟中,使用InputFormat類來更好地對接收輸入檔案進行控制。而在本文中,我們将同大家一起探讨怎樣自定義最後一個步驟——即怎樣寫入輸出檔案。OutputFormat将Map/Reduce作業的輸出結果轉換為其他應用程式可讀的方式,進而輕松實作與其他系統的互操作。為了展示OutputFormts的實用性,我們将用兩個例子進行讨論:如何拆分作業結果到不同目錄以及如何為提供快速鍵值查找的服務寫入檔案。

OutputFormats是做什麼的?

OutputFormt接口決定了在哪裡以及怎樣持久化作業結果。Hadoop為不同類型的格式提供了一系列的類和接口,實作自定義操作隻要繼承其中的某個類或接口即可。你可能已經熟悉了預設的OutputFormat,也就是TextOutputFormat,它是一種以行分隔,包含制表符界定的鍵值對的文本檔案格式。盡管如此,對多數類型的資料而言,如再常見不過的數字,文本序列化會浪費一些空間,由此帶來的結果是運作時間更長且資源消耗更多。為了避免文本檔案的弊端,Hadoop提供了SequenceFileOutputformat,它将對象表示成二進制形式而不再是文本檔案,并将結果進行壓縮。下面是Hadoop提供的類層次結構:

  • FileOutputFormat(實作OutputFormat接口)—— 所有OutputFormats的基類
    • MapFileOutputFormat —— 一種使用部分索引鍵的格式
    • SequenceFileOutputFormat —— 二進制鍵值資料的壓縮格式
      • SequenceFileAsBinaryOutputFormat —— 原生二進制資料的壓縮格式
    • TextOutputFormat —— 以行分隔、包含制表符定界的鍵值對的文本檔案格式
    • MultipleOutputFormat —— 使用鍵值對參數寫入檔案的抽象類
      • MultipleTextOutputFormat —— 輸出多個以标準行分割、制表符定界格式的檔案
      • MultipleSequenceFileOutputFormat —— 輸出多個壓縮格式的檔案

OutputFormat提供了對RecordWriter的實作,進而指定如何序列化資料。 RecordWriter類可以處理包含單個鍵值對的作業,并将結果寫入到OutputFormat中準備好的位置。RecordWriter的實作主要包括兩個函數:“write”和“close”。“write”函數從Map/Reduce作業中取出鍵值對,并将其位元組寫入磁盤。LineRecordWriter是預設使用的RecordWriter,它是前面提到的TextOutputFormat的一部分。它寫入的内容包括:

  • 鍵(key)的位元組 (由getBytes()函數傳回)
  • 一個用以定界的制表符
  • 值(value)的位元組(同樣由getBytes()函數傳回)
  • 一個換行符

“close”函數會關閉Hadoop到輸出檔案的資料流。

我們已經讨論了輸出資料的格式,下面我們關心的問題是資料存儲在何處?同樣,你或許看到過某個作業的輸出結果會以多個“部分”檔案的方式存儲在輸出目錄中,如下:

|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
   '-- part-00005      

預設情況下,當需要寫入資料時,每個程序都會在輸出目錄建立自己的檔案。資料由reducers在作業結束時寫入(如果沒有reducers會由mapper寫入)。即使在本文後面提到的建立自定義輸出目錄時,我們仍會保持寫入“部分”檔案,這麼做可以讓多個程序同時寫入同一個目錄而互不幹擾。

自定義OutputFormat

從前面我們已經看到,OutputFormat類的主要職責是決定資料的存儲位置以及寫入的方式。那麼為什麼要自定義這些行為呢?自定義資料位置的原因之一是為了将Map/Reduce作業輸出分離到不同的目錄。例如,假設需要處理一個包含世界範圍内的搜尋請求的日志檔案,并希望計算出每個國家的搜尋頻度。你想要在不牽涉其他國家的前提下能夠檢視某個特定國家的結果。也許以後在你的資料管道中,會用不同的程序來處理不同的國家,或者想要把某個特定國家的結果複制一份到該國的資料中心去。使用預設的OutputFormat時,所有的資料都會存儲在同一目錄下,這樣在不浏覽的情況下是無從知曉“部分”檔案的内容的。而通過使用自定義的OutputFormat,你可以為每個國家建立一個子目錄的布局,如下:

|-- output-directory
|   |-- France
|   |   |-- part-00000
|   |   |-- part-00001
|   |   '-- part-00002
... |
|   '-- Zimbabwe
|       |-- part-00000
|       |-- part-00001
|       '-- part-00002      

其中每個部分檔案都具有鍵值對(“搜尋詞彙”=>頻度)。現在隻要簡單地指定某個國家資料所在的路徑,就可以隻讀取該國家的資料了。下面我們将看到怎樣繼承MultipleTextOutputFormat類,以獲得所需的行為。

自定義OutputFormat還有一些其他的原因,以名為ElephantDB的項目為例, 它将資料以一種面向消費應用程式的“本地”形式進行存儲。這個項目的設立是為了讓Map/Reduece作業結果可以像分布式服務一樣被查詢。ElephantDB寫入的并不是文本檔案,而是使用自定義的OutputFormat将結果寫成BerkeleyDB檔案,其中這些檔案使用作業輸出的鍵進行索引。之後使用某個服務加載BerkeleyDB檔案,可以提供低延滞的任意鍵查找。類似的系統還有HBase和Voldemort,它們可以存儲Hadoop生成的鍵值資料。ElephantDB重點關注的是怎樣與Hadoop批量式更新進行簡易緊密的內建。

多路輸出

為了解決上面的搜尋日志的問題,我們繼承了MultipleTextOutputFormat類,并根據被寫入的鍵值來選擇輸出目錄。我們的Map/Reduce作業将會為搜尋請求所在國家生成一個鍵,并為搜尋詞彙及該搜尋的頻度産生一個值。由于MultipleTextOutputFormat已經知道如何寫入文本檔案,是以并不需要為OutputFormat實作序列化功能。清單1實作了該類:

1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9        /**
10        * Use they key as part of the path for the final output file.
11        */
12       @Override
13       protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14             return new Path(key.toString(), leaf).toString();
15       }
16
17       /**
18        * When actually writing the data, discard the key since it is already in
19        * the file path.
20        */
21       @Override
22       protected Text generateActualKey(Text key, Text value) {
23             return null;
24          }
25 }      

清單1:MultipleTextOutputFormat子類樣例

MultipleTextOutputFormatByKey類的generateActualFileNameForKeyValue方法指定了作業輸出的存儲位置(第13行)。對于每組由Map/Reduce作業生成的鍵值對,該類會把鍵加入到路徑名稱中作為輸出。“leaf”參數就是我們之前看到的“part-0000”,它在每個reducer中都是獨一無二的,這樣可以允許不同程序同時寫入到輸出目錄而互不影響。例如,由第一個reducer産生的鍵為“France”、值為“soccer 5000”的結果會被寫入到“output-directory/France/part-00000”内的某個檔案中。

要使用這個類,需確定Hadoop包含了這個自定義類的jar,并使用完整的類名作為“-outputformat”的參數:

hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
  -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
  -input search-logs \
  -output search-frequency-by-country \
  -mapper parse-logs.py \
  -reducer count-searches.py       

清單1是oddjob項目中某個類的Java實作。oddjob是一個開源庫,提供了多種MultipleTextOutputFormat。雖然這個庫面向的是Hadoop的流特性,但是它也可以用在産生文本鍵值輸出的其他作業中。

為服務準備輸出

在我們的下一個例子中,必須實作兩個接口來自定義資料序列化以及檔案存放的目錄結構,以使結果可被ElephantDB服務加載。正如前面所讨論的,序列化部分會由RecordWriter的實作來處理。在LineRecordWriter類将位元組流寫入輸出檔案的同時,ElephantRecordWriter還包含了專門的邏輯用來選擇要寫入的檔案以及使用第三方庫來格式化磁盤上的資料。

1   public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3       FileSystem _fs;
4       Args _args;
5       Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6       Progressable _progressable;
7       LocalElephantManager _localManager;
8
9       int _numWritten = 0;
10      long _lastCheckpoint = System.currentTimeMillis();
11
12      public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13         _fs = Utils.getFS(args.outputDirHdfs, conf);
14         _args = args;
15         _progressable = progressable;
16         _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17      }
18
19      private String remoteUpdateDirForShard(int shard) {
20          if(_args.updateDirHdfs==null) return null;
21          else return _args.updateDirHdfs + "/" + shard;
22      }
23
24      public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25          LocalPersistence lp = null;
26          LocalPersistenceFactory fact = _args.spec.getLPFactory();
27          Map<String, Object> options = _args.persistenceOptions;
28          if(_lps.containsKey(shard.get())) {
29             lp = _lps.get(shard.get());
30          } else {
31             String updateDir = remoteUpdateDirForShard(shard.get());
32             String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33             lp = fact.openPersistenceForAppend(localShard, options);
34             _lps.put(shard.get(), lp);
35             progress();
36          }
37
38          _args.updater.updateElephant(lp, record.key, record.val);
39
40          _numWritten++;
41          if(_numWritten % 25000 == 0) {
42             long now = System.currentTimeMillis();
43             long delta = now - _lastCheckpoint;
44             _lastCheckpoint = now;
45             LOG.info("Wrote last 25000 records in " + delta + " ms");
46             _localManager.progress();
47          }
48      }
49
50      public void close(Reporter reporter) throws IOException {
51          for(Integer shard: _lps.keySet()) {
52             String lpDir = _localManager.localTmpDir("" + shard);
53             LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54             _lps.get(shard).close();
55             LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56             progress();
57             String remoteDir = _args.outputDirHdfs + "/" + shard;
58             if(_fs.exists(new Path(remoteDir))) {
59                 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60                 _fs.delete(new Path(remoteDir), true);
61                 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62             }
63             LOG.info("Copying " + lpDir + " to " + remoteDir);
64             _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65             LOG.info("Copied " + lpDir + " to " + remoteDir);
66             progress();
67          }
68          _localManager.cleanup();
69      }
70
71      private void progress() {
72           if(_progressable!=null) _progressable.progress();
73      }
74   }      

清單2:從ElephantDB中摘錄的某個RecordWriter子類

ElephantDB的工作方式是通過跨越若幹個LocalPersistence對象(BerkeleyDB檔案)來對資料進行分片(劃分)。ElephantRecordWriter類中的write函數拿到分片ID,并檢查該分片是否已經打開(第28行),如果沒有則打開并建立一個新的本地檔案(第33行)。第38行的updateElephant調用将作業輸出的鍵值對寫入到BerkeleyDB檔案。

當關閉ElephantRecordWriter時,該類在第64行會複制BerkeleyDB檔案到HDFS中,且可以随意選擇是否覆寫舊檔案。接下去的progress方法調用會通知Hadoop目前的RecordWriter正在按計劃進行,這有點類似于真實Map/Reduce作業中的狀态或計數器更新。

下一步是利用ElephantRecordWriter來實作OutputFormat。要了解此清單中的代碼,重點是了解Hadoop JobConf對象封裝了什麼。顧名思義,JobConf對象包含了某項作業的全部設定,包括輸入輸出目錄,作業名稱以及mapper和reducer類。清單3展示了兩個自定義類是如何共同工作的:

1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2     public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4     public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5         return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6     }
7
8     public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9         Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10         fs = Utils.getFS(args.outputDirHdfs, conf);
11         if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12             throw new InvalidJobConfException("Speculative execution should be false");
13         }
14         if(fs.exists(new Path(args.outputDirHdfs))) {
15             throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16         }
17         if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18             throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19         }
20     }
21   }      

清單3:從ElephantDB中摘錄的某個OutputFormat實作

正如前面所看到的,OutputFormat有兩個職責,分别是決定資料的存儲位置以及資料寫入的方式。ElephantOutputFormat的資料存儲位置是通過檢查JobConf以及在第14和17行檢查確定該位置是一個合法目标位置後來決定的。至于資料的寫入方式,則是由getRecordWriter函數處理,它的傳回結果是清單2中的ElephantRecordWriter對象。

從Hadoop的角度來看,當Map/Reduce作業結束并且每個reducer産生了鍵值對流的時候,這些類會派上用場。Hadoop會以作業配置為參數調用checkOutputSpecs。如果函數運作沒有抛出異常,它會接下去調用getRecordWriter以傳回可以寫入流資料的對象。當所有的鍵值對都被寫入後,Hadoop會調用writer中的close函數,将資料送出到HDFS并結束該reducer的職責。

總結

OutputFormat是Hadoop架構中的重要組成部分。它們通過為目标消費應用程式産生合适的輸出來提供與其他系統和服務間的互操作。自定義作業輸出位置可以簡化并加速資料工作流;而自定義結果輸出方式可以讓其快速地工作于其他不同的環境下。雖然實作OutputFormat和覆寫幾個方法一樣簡單,但是它足夠靈活可以支援全新的磁盤上的資料格式。

關于作者

探索Hadoop outputFormat

Jim Blomo (@jimblomo)熱衷于開發強勁優雅的系統來操控資料。在Yelp公司中,他負責管理一個日益增長的資料挖掘團隊,該團隊使用Hadoop,mrjob以及oddjob處理TB級的資料。在加入Yelp公司前,他曾經為創業公司和Amazon建構基礎設施。他喜愛美食,文化,以及同妻子一道漫步在舊金山灣區的戶外。

檢視英文原文:Exploring Hadoop OutputFormat