天天看點

map/reduce優化的幾點建議

1    選擇Mapper的數量 

    Hadoop處理大量小檔案的性能比較遜色,主要由于生成的每個分片都是一整個檔案,Map操作時隻會處理很少的輸入資料,但是會産生很多Map任務,每個Map任務的運作都包括産生、排程和結束時間,大量的Map任務會造成一定的性能損失。可以通過Java虛拟機(JVM)重用來解決這個問題。hadoop預設每個JVM隻運作一個任務。使用JVM重用後,一個JVM可以順序執行多個任務,減少了啟動時間。控制JVM的屬性是mapred.job.reuse.jvm.num.tasks,它指定每個JVM運作的任務的最大數量,預設為1。可以通過JonConf的setNumTasksToExecutePerJvm()方法設定,若設定為-1則說明同一作業中共享一個JVM任務的數量不受限制。 

    如果輸入的檔案過大,還可以通過将HDFS上的塊大小增大,比如增加到256M或512M,以減少Mapper數量,可以通過運作參數(-Ddfs.block.size = $[256*1024*1024])将塊大小增大到256M。 

2    選擇Reducer的數量 

    Hadoop預設運作一個Reducer,所有的Reduce任務都會放到單一的Reducer去執行,效率非常低下。為了提高性能,可以适當增大Reducer的數量。 

    最優的Reducer數量取決于叢集中可用的Reducer任務槽的數目。Reducer任務槽的數目是叢集中節點個數與mapred.tasktracker.reduce.tasks.maximum(預設為2)的乘積,也可以通過MapReduce的使用者界面獲得。 

    一個普遍的做法是将Reducer數量設定為比Reducer任務槽數目稍微小一些,這會給Reducer任務留有餘地,同時将使得Reducer能夠在同一波中完成任務,并在Reducer階段充分使用叢集。 

    Reducer的數量由mapred.reduce.tasks屬性設定,通常在MapReduce作業的驅動方法中通過setNumReduceTasks(n)調用方法動态設定Reducer的數目為n。 

3    使用Combiner函數 

    Combiner過程是一個可選的優化過程,如果這個過程适合你的作業,Combiner執行個體會在每個運作Map任務的節點上運作,它會接收本節點上Mapper執行個體的輸出作為輸入,然後Combiner的輸出會被發送到Reducer,而不是發送Mapper的輸出。 

    Combiner是一個“迷你Reduce”過程,它是用Reducer接口來定義的,隻對本節點生成的資料進行規約。為了直覺了解Combiner的作用,使用WordCount程式進行說明。在該程式中Map任務會生成很多(“word”,1)對,如果同一分片中“Cat”出現了5次,則會生成5個(“Cat”,1)對被發送至Reducer。通過使用Combiner,這5個key/value對會被規約為一個(“Cat”,5)發送至Reducer。Combiner過程會針對輸入反複運作,但不會影響最終結果。運作Combiner的意義在于使Map輸出更緊湊,進而大大減少了Shuffle過程所需的帶寬,并加速了作業的執行。 

    通過setCombinerClass(Class<? extends Reducer> theClass)方法使用Combiner過程,括号中指定了Combiner所使用的類,如果使用者的Reduce函數可交換并可組合(比如WordCount的Reduce函數),則可以直接在驅動方法中添加如下代碼:conf.setCombinerClass(Reduce.class);否則使用者必須編寫一個第三方類作為作業的Combiner。 

4    壓縮Map的輸出 

    在Map任務完成後對将要寫入磁盤的資料進行壓縮是一種很好的優化方法,它能夠使資料寫入磁盤的速度更快,節省磁盤空間,減少需要傳送到Reducer的資料量,以達到減少MapReduce作業執行時間的目的。Hadoop支援的壓縮格式及相關資訊如表1所示。 

                   表1  Hadoop支援的壓縮格式 

壓縮格式  工具 算法    檔案擴充名 Hadoop壓縮編碼/解碼器 

DEFLATE 無 DEFLATE    .deflate Org.apache.hadoop.io.compress.DefaultCodec 

Gzip gzip DEFLATE    .gz         Org.apache.hadoop.io.compress.GzipCodec 

bzip2 bzip2 bzip2    .bz2         Org.apache.hadoop.io.compress.BZip2Codec 

LZO lzop LZO    .lzo         Com.hadoop.compression.lzo.LzopCodec 

    在使用Map輸出壓縮時需要考慮壓縮格式的速度最優與空間最優的協調。通常來說,Gzip壓縮在空間/時間處理上相對平衡;bzip2壓縮比gzip更有效,但速度較慢;LZO壓縮[19]使用速度最優算法,但壓縮效率稍低。我們需要根據MapReduce作業以及輸入資料的不同進行選擇。 

    MapReduce應用程式的驅動方法中加入如下所示代碼便可以啟用gzip格式來壓縮Map的輸出結果。 

    conf.setCompressMapOutput(true); 

    conf.setMapOutputCompressorClass(GzipCodec.class); 

5    選擇合适的序列化格式 

    序列化指的是将結構化對象轉為位元組流以便于通過網絡進行傳輸或寫入持久存儲的過程。反序列化指的是将位元組流轉為一系列結構化對象的過程。 

    在Hadoop中,節點之間的程序間通信是通過遠端過程調用(RPC)實作的。RPC協定使用序列化将消息編碼為二進制流後發送至遠端節點,然後二進制流被反序列化為原始資訊。Hadoop使用自己的序列化格式Writables,MapReduce程式使用它來序列化key/ value對,它是整個Hadoop的核心。 

    Writeable接口定義了兩個方法:一個用于将其狀态寫入二進制格式的DataOutput,另一個用于從二進制格式的DataInput流讀取其狀态。具體如下所示。 

public interface Writable{ 

    void  write(DataOutput  out)  throws IOException 

    void  readFields(DataInput  in)  throws IOException 

    Hadoop将許多Writable類歸入包org.apache.hadoop.io中,其封裝了Java的基本類,此外還有short和char類型,具體如表2所示。 

          表2  Writable的Java基本類封裝 

Java基本類型 Writable使用 序列化大小(位元組) 

布爾型         BooleanWritable 1 

位元組型         ByteWritable 1 

整型         IntWritable 4 

整型         VIntWritable 1-5 

浮點型         FloatWritable 4 

長整型         LongWritable 8 

長整型         VLongWritable 1-9 

雙精度浮點型 DoubleWritable 8 

    正确選擇合适的Writable類型,能夠減少CPU占用率和存儲空間,提高作業性能。 

    Hadoop自帶一系列Writable實作已經能夠滿足大多數用途。但我們也可以自定義Writable來控制二進制表示和排序順序以應對更複雜的結構,編寫自定義Writable時需要實作RawComparator,通過檢視其序列化表示的方式來比較資料。