一般來說,計算機處理的資料都存在一些備援度,同時資料中間,尤其是相鄰資料間存在着相關性,是以可以通過一些有别于原始編碼的特殊編碼方式來儲存資料,使資料占用的存儲空間比較小,這個過程一般叫壓縮。和壓縮對應的概念是解壓縮,就是将被壓縮的資料從特殊編碼方式還原為原始資料的過程。
壓縮廣泛應用于海量資料進行中,對資料檔案進行壓縮,可以有效減少存儲檔案所需的空間,并加快資料在網絡上或者到磁盤上的傳輸速度。在hadoop中,壓縮應用于檔案存儲、map階段到reduce階段的資料交換(需要打開相關的選項)等情景。
資料壓縮的方式非常多,不同特點的資料有不同的資料壓縮方式:如對聲音和圖像等特殊資料的壓縮,就可以采用有損的壓縮方法,允許壓縮過程中損失一定
的資訊,換取比較大的壓縮比;而對音樂資料的壓縮,由于資料有自己比較特殊的編碼方式,是以也可以采用一些針對這些特殊編碼的專用資料壓縮算法。
hadoop作為一個較通用的海量資料處理平台,在使用壓縮方式方面,主要考慮壓縮速度和壓縮檔案的可分割性。
所有的壓縮算法都會考慮時間和空間的權衡,更快的壓縮和解壓縮速度通常會耗費更多的空間(壓縮比較低)。例如,通過gzip指令壓縮資料時,使用者可以設定不同的選項來選擇速度優先或空間優先,選項–1表示優先考慮速度,選項–9表示空間最優,可以獲得最大的壓縮比。
需要注意的是,有些壓縮算法的壓縮和解壓縮速度會有比較大的差别:gzip和zip是通用的壓縮工具,在時間/空間處理上相對平衡,gzip2壓縮比gzip和zip更有效,但速度較慢,而且bzip2的解壓縮速度快于它的壓縮速度。
當使用mapreduce處理壓縮檔案時,需要考慮壓縮檔案的可分割性。考
慮我們需要對保持在hdfs上的一個大小為1gb的文本檔案進行處理,目前hdfs的資料塊大小為64mb的情況下,該檔案被存儲為16塊,對應的
mapreduce作業将會将該檔案分為16個輸入分片,提供給16個獨立的map任務進行處理。但如果該檔案是一個gzip格式的壓縮檔案(大小不
變),這時,mapreduce作業不能夠将該檔案分為16個分片,因為不可能從gzip資料流中的某個點開始,進行資料解壓。但是,如果該檔案是一個
bzip2格式的壓縮檔案,那麼,mapreduce作業可以通過bzip2格式壓縮檔案中的塊,将輸入劃分為若幹輸入分片,并從塊開始處開始解壓縮數
據。bzip2格式壓縮檔案中,塊與塊間提供了一個48位的同步标記,是以,bzip2支援資料分割。
hadoop支援的壓縮格式:
壓縮格式
unix工具
算法
檔案擴充名
支援多檔案
可分割
deflate
無
.deflate
否
gzip
.gz
zip
.zip
是
bzip
bzip2
.bz2
lzo
lzop
.lzo
為了支援多種壓縮解壓縮算法,hadoop引入了編碼/解碼器。與hadoop序列化架構類似,編碼/解碼器也是使用抽象工廠的設計模式。目前,hadoop支援的編碼/解碼器如下
壓縮算法及其編碼/解碼器:
對應的編碼/解碼器
org.apache.hadoop.io.compress.defaultcodec
org.apache.hadoop.io.compress.gzipcodec
org.apache.hadoop.io.compress.bzip2codec
snappy
org.apache.hadoop.io.compress.snappycodec
同一個壓縮方法對應的壓縮、解壓縮相關工具,都可以通過相應的編碼/解碼器獲得。
下面将介紹使用編碼/解碼器的典型執行個體(代碼在org.hadoopinternal.compress包中)。其中,compress()方法接
受一個字元串參數,用于指定編碼/解碼器,并用對應的壓縮算法對文本檔案readme.txt進行壓縮。字元串參數使用java的反射機制建立對應的編碼
/解碼器對象,通過compressioncodec對象,進一步使用它的createoutputstream()方法構造一個
compressionoutputstream流,未壓縮的資料通過ioutils.copybytes()方法,從輸入檔案流中複制寫入
compressionoutputstream流,最終以壓縮格式寫入底層的輸出流中。
在本執行個體中,底層使用的是檔案輸出流fileoutputstream,它關聯檔案的檔案名,是在原有檔案名的基礎上添加壓縮算法相應的擴充名生
成。該擴充名可以通過compressioncodec對象的getdefaultextension()方法獲得。相關代碼如下:
需要解壓縮檔案時,通常通過其擴充名來推斷它對應的編碼/解碼器,進而用相應的解碼流對資料進行解碼,如擴充名為gz的檔案可以使用gzipcodec閱讀。
compressioncodecfactory提供了getcodec()方法,用于将檔案擴充名映射到對應的編碼/解碼器,如下面的例子。有了
compressioncodec對象,就可以使用和壓縮類似的過程,通過對象的createinputstream()方法獲得
compressioninputstream對象,解碼資料。相關代碼如下:
hadoop通過以編碼/解碼器為基礎的抽象工廠方法,提供了一個可擴充的架構,支援多種壓縮方法。下面就來研究hadoop壓縮架構的實作。
1. 編碼/解碼器
前面已經提過,compressioncodec接口實作了編碼/解碼器,使用的是抽象工廠的設計模式。compressioncodec提供了一系列方法,用于建立特定壓縮算法的相關設施,其類圖如圖所示:
compressioncodec中的方法很對稱,一個壓縮功能總對應着一個解壓縮功能。其中,與壓縮有關的方法包括:
createoutputstream()用于通過底層輸出流建立對應壓縮算法的壓縮流,重載的createoutputstream()方法可使用壓縮器建立壓縮流;
createcompressor()方法用于建立壓縮算法對應的壓縮器。後續會繼續介紹壓縮流compressionoutputstream和壓縮器compressor。解壓縮也有對應的方法和類。
compressioncodec中還提供了擷取對應檔案擴充名的方法getdefaultextension(),如對于
org.apache.hadoop.io.compress.bzip2codec,該方法傳回字元串“.bz2”,注意字元串的第一個字元。相關代碼
如下:
compressioncodecfactory是hadoop壓縮架構中的另一個類,它應用了工廠方法,使用者可以通過它提供的方法獲得compressioncodec。
注意:抽象工廠方法和工廠方法這兩個設計模式有很大的差別,抽象工廠方法用于建立一
系列相關或互相依賴的對象,如compressioncodec可以獲得和某一個壓縮算法相關的對象,包括壓縮流和解壓縮流等。而工廠方法(嚴格來
說,compressioncodecfactory是參數化工廠方法),用于建立多種産品,如通過compressioncodecfactory的
getcodec()方法,可以建立gzipcodec對象或bzip2codec對象。
在前面的執行個體中已經使用過getcodec()方法,為某一個壓縮檔案尋找對應的compressioncodec。為了分析該方法,需要了解compressioncodec類中儲存檔案擴充名和compressioncodec映射關系的成員變量codecs。
codecs是一個有序映射表,即它本身是一個map,同時它對map的鍵排序,下面是codecs中儲存的一個可能的映射關系:
可以看到,map中的鍵是排序的。
getcodec()方法的輸入是path對象,儲存着檔案路徑,如執行個體中的“readme.txt.bz2”。
首先通過擷取path對象對應的檔案名并逆轉該字元串得到“2zb.txt.emdaer”,然後通過有序映射sortedmap的
headmap()方法,查找最接近上述逆轉字元串的有序映射的部分視圖,如輸入“2zb.txt.emdaer”的查找結果submap,隻包含
“2zb.”對應的那個鍵–值對,如果輸入是“zg.txt.emdaer”,則submap會包含成員變量codecs中儲存的所有鍵–值對。然後,簡
單地擷取submap最後一個元素的鍵,如果該鍵是逆轉檔案名的字首,那麼就找到了檔案對應的編碼/解碼器,否則傳回空。實作代碼如下:
compressioncodecfactory.getcodec()方法的代碼看似複雜,但通過靈活使用有序映射sortedmap,實作其實還是非常簡單的。
2. 壓縮器和解壓器
壓縮器(compressor)和解壓器(decompressor)是hadoop壓縮架構中的一對重要概念。
compressor可以插入壓縮輸出流的實作中,提供具體的壓縮功能;相反,decompressor提供具體的解壓功能并插入
compressioninputstream中。compressor和decompressor的這種設計,最初是在java的zlib壓縮程式庫中
引入的,對應的實作分别是java.util.zip.deflater和java.util.zip.inflater。下面以compressor為
例介紹這對元件。
compressor的用法相對複雜,compressor通過setinput()方法接收資料到内部緩沖區,自然可以多次調用
setinput()方法,但内部緩沖區總是會被寫滿。如何判斷壓縮器内部緩沖區是否已滿呢?可以通過needsinput()的傳回值,如果是
false,表明緩沖區已經滿,這時必須通過compress()方法擷取壓縮後的資料,釋放緩沖區空間。
為了提高壓縮效率,并不是每次使用者調用setinput()方法,壓縮器就會立即工作,是以,為了通知壓縮器所有資料已經寫入,必須使用
finish()方法。finish()調用結束後,壓縮器緩沖區中保持的已經壓縮的資料,可以繼續通過compress()方法獲得。至于要判斷壓縮器
中是否還有未讀取的壓縮資料,則需要利用finished()方法來判斷。
注意:finished()和finish()的作用不同,finish()結束資料輸入的過程,而finished()傳回false,表明壓縮器中還有未讀取的壓縮資料,可以繼續通過compress()方法讀取。
compressor接口源代碼如下:
使用compressor的一個典型執行個體如下:
以上代碼實作了setinput()、needsinput()、finish()、compress()和finished()的配合過程。将輸
入inbuf分成幾個部分,通過setinput()方法送入壓縮器,而在finish()調用結束後,通過finished()循序判斷壓縮器是否還有
未讀取的資料,并使用compress()方法擷取資料。
在壓縮的過程中,compressor可以通過getbytesread()和getbyteswritten()方法獲得compressor輸
入未壓縮位元組的總數和輸出壓縮位元組的總數,如執行個體中最後一行的輸出語句。compressor和decompressor的類圖如圖所示。
compressor.end()方法用于關閉解壓縮器并放棄所有未處理的輸入;reset()方法用于重置壓縮器,以處理新的輸入資料集合;reinit()方法更進一步允許使用hadoop的配置系統,重置并重新配置壓縮器。
3. 壓縮流和解壓縮流
java最初版本的輸入/輸出系統是基于流的,流抽象了任何有能力産出資料的資料源,或者是有能力接收資料的接收端。一般來說,通過設計模式裝飾,
可以為流添加一些額外的功能,如前面提及的序列化流objectinputstream和objectoutputstream。
壓縮流(compressionoutputstream)和解壓縮流(compressioninputstream)是hadoop壓縮架構中
的另一對重要概念,它提供了基于流的壓縮解壓縮能力。如圖3-7所示是從java.io.inputstream和
java.io.outputstream開始的類圖。
這裡隻分析和壓縮相關的代碼,即compressionoutputstream及其子類。
outputstream是一個抽象類,提供了進行流輸出的基本方法,它包含三個write成員函數,分别用于往流中寫入一個位元組、一個位元組數組或一個位元組數組的一部分(需要提供起始偏移量和長度)。
注意:流實作中一般需要支援的close()和flush()方法,是java.io包中的相應接口的成員函數,不是outputstream的成員函數。
compressionoutputstream繼承自outputstream,也是個抽象類。如前面提到的
objectoutputstream、compressionoutputstream為其他流添加了附加額外的壓縮功能,其他流儲存在類的成員變量
out中,并在構造的時候被指派。
compressionoutputstream實作了outputstream的close()方法和flush()方法,但用于輸出資料的
write()方法、用于結束壓縮過程并将輸入寫到底層流的finish()方法和重置壓縮狀态的resetstate()方法還是抽象方法,需要
compressionoutputstream的子類實作。相關代碼如下:
compressionoutputstream規定了壓縮流的對外接口,如果已經有了一個壓縮器的實作,能否提供一個通用的、使用壓縮器的壓縮流實作呢?答案是肯定的,compressorstream使用壓縮器實作了一個通用的壓縮流,其主要代碼如下:
compressorstream提供了幾個不同的構造函數,用于初始化相關的成員變量。上述代碼片段中保留了參數最多的構造函數,其
中,compressorstream需要的底層輸出流out和壓縮時使用的壓縮器,都作為參數傳入構造函數。另一個參數是
compressorstream工作時使用的緩沖區buffer的大小,構造時會利用這個參數配置設定該緩沖區。
compressorstream.write()方法用于将待壓縮的資料寫入流中。待壓縮的資料在進行一番檢查後,最終調用壓縮器的
setinput()方法進入壓縮器。setinput()方法調用結束後,通過compressor.needsinput()判斷是否需要調用
compress()方法,擷取壓縮後的輸出資料。上一節已經讨論了這個問題,如果内部緩沖區已滿,則需要通過compress()方法提取資料,提取後
的資料直接通過底層流的write()方法輸出。
當finish()被調用(往往是compressorstream被關閉),這時compressorstream流調用壓縮器的finish()方法通知輸入已經結束,然後進入另一個循環,該循環不斷讀取壓縮器中未讀取的資料,然後輸出到底層流out中。
compressorstream中的其他方法,如resetstate()和close()都比較簡單,不再一一介紹了。
compressorstream利用壓縮器compressor實作了一個通用的壓縮流,在hadoop中引入一個新的壓縮算法,如果沒有特殊的
考慮,一般隻需要實作相關的壓縮器和解壓器,然後通過compressorstream和decompressorstream,就實作相關壓縮算法的輸
入/輸出流了。
compressorstream的實作并不複雜,隻需要注意壓縮器幾個方法間的配合,下圖給出了這些方法的一個典型調用順序:
4. java本地方法
資料壓縮往往是計算密集型的操作,考慮到性能,建議使用本地庫(native library)來壓縮和解壓。在某個測試中,與java實作的内置gzip壓縮相比,使用本地gzip壓縮庫可以将解壓時間減少50%,而壓縮時間大概減少10%。
hadoop的deflate、gzip和snappy都支援算法的本地實作,其中apache發行版中還包含了deflate和gzip的32位
和64位linux本地壓縮庫(cloudera發行版還包括snappy壓縮方法)。預設情況下,hadoop會在它運作的平台上查找本地庫。
假設有一個c
函數,它實作了某些功能,同時因為某種原因(如效率),使得使用者不希望用java語言重新實作該功能,那麼java本地方法(native
method)就是一個不錯的選擇。java提供了一些鈎子函數,使得調用本地方法成為可能,同時,jdk也提供了一些工具,協助使用者減輕程式設計負擔。
java語言中的關鍵字native用于表示某個方法為本地方法,顯然,本地方法是類的成員方法。下
面是一個本地方法的例子,代碼片段來自cloudera的snappy壓縮實作,在
org.apache.hadoop.io.compress.snappy包中。其中,靜态方法initids()和方法
compressbytesdirect()用關鍵字native修飾,表明這是一個java本地方法。相關代碼如下: