本節書摘來自華章計算機《深入了解大資料:大資料處理與程式設計實踐》一書中的第3章,第3.5節,作者 主 編:黃宜華(南京大學)副主編:苗凱翔(英特爾公司),更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。
除了上一節提到的指令之外,hadoop提供了可用于讀寫、操作檔案的api,這樣可以讓程式員通過程式設計實作自己的hdfs檔案操作。
hadoop提供的大部分檔案操作api都位于org.apache.hadoop.fs這個包中。基本的檔案操作包括打開、讀取、寫入、關閉等。為了保證能跨檔案系統交換資料,hadoop的api也可以對部分非hdfs的檔案系統提供支援;也就是說,用這些api來操作本地檔案系統的檔案也是可行的。
3.5.1 hdfs程式設計基礎知識
在hadoop中,基本上所有的檔案api都來自filesystem類。filesystem是一個用來與檔案系統互動的抽象類,可以通過實作filesystem的子類來處理具體的檔案系統,比如hdfs或者其他檔案系統。通過factory方法filesystem.get(configuration conf),可以獲得所需的檔案系統執行個體(factory方法是軟體開發的一種設計模式,指:基類定義接口,但是由子類執行個體化之;在這裡filesystem定義get接口,但是由filesytem的子類(比如filterfilesystem)實作)。configuration類比較特殊,這個類通過鍵值對的方式儲存了一些配置參數。這些配置預設情況下來自對應檔案系統的資源配置。我們可以通過如下方式獲得具體的filesystem執行個體:
conf?iguration conf = new conf?iguration();
filesystem hdfs = filesystem.get(conf);
如果要獲得本地檔案系統對應的filesystem執行個體,則可以通過factory方法filesystem.getlocal(configuration conf)實作:
filesystem local = filesystem.getlocal(conf);
hadoop中,使用path類的對象來編碼目錄或者檔案的路徑,使用後面會提到的filestatus類來存放目錄和檔案的資訊。在java的檔案api中,檔案名都是string類型的字元串,在這裡則是path類型的對象。
3.5.2 hdfs基本檔案操作api
接下來看一下具體的檔案操作。我們按照“建立、打開、擷取檔案資訊、擷取目錄資訊、讀取、寫入、關閉、删除”的順序講解hadoop提供的檔案操作的api。
以下接口的實際内容可以在hadoop api和hadoop源代碼中進一步了解。
1.?建立檔案
filesystem.create方法有很多種定義形式,參數最多的一個是:
那些參數較少的create隻不過是将其中一部分參數用預設值代替,最終還是要調用這個函數。其中各項的含義如下:
f:檔案名
overwrite:如果已存在同名檔案,overwrite=true覆寫之,否則抛出錯誤;預設為true。
buffersize:檔案緩存大小。預設值:configuration中io.file.buffer.size的值,如果configuration中未顯式設定該值,則是4096。
replication:建立的副本個數,預設值為1。
blocksize:檔案的block大小,預設值:configuration中fs.local.block.size的值,如果configuration中未顯式設定該值,則是32m。
permission和progress的值與具體檔案系統實作有關。
但是大部分情況下,隻需要用到最簡單的幾個版本:
publicfsdataoutputstream create(path?f);
publicfsdataoutputstream create(path?f,boolean?overwrite);
publicfsdataoutputstream create(path?f,boolean?overwrite,int?buffersize);
2.?打開檔案
filesystem.open方法有2個,參數最多的一個定義如下:
public abstract fsdatainputstream open(path f, intbuffersize) throws ioexception
其中各項的含義如下:
buffersize:檔案緩存大小。預設值:configuration中io.file.buffer.size的值,如果configur
ation中未顯式設定該值,則是4096。
3.?擷取檔案資訊
filesystem.getfilestatus方法格式如下:
public abstract filestatus getfilestatus(path f) throws ioexception;
這一函數會傳回一個filestatus對象。通過閱讀源代碼可知,filestatus儲存了檔案的很多資訊,包括:
path:檔案路徑
length:檔案長度
isdir:是否為目錄
block_replication:資料塊副本因子
blocksize:檔案長度(資料塊數)
modification_time:最近一次修改時間
access_time:最近一次通路時間
owner:檔案所屬使用者
group:檔案所屬組
如果想了解檔案的這些資訊,可以在獲得檔案的filestatus執行個體之後,調用相應的getxxx方法(比如,filestatus.getmodificationtime()獲得最近修改時間)。
4.?擷取目錄資訊
擷取目錄資訊,不僅是目錄本身,還有目錄之下的檔案和子目錄資訊,如下所述。filestatus.liststatus方法格式如下:
public filestatus[] liststatus(path f) throws ioexception;
如果f是目錄,那麼将目錄之下的每個目錄或檔案資訊儲存在filestatus數組中傳回。如果f是檔案,和getfilestatus功能一緻。
另外,liststatus還有參數為path[]的版本的接口定義以及參數帶路徑過濾器pathfilter的接口定義,參數為path[]的liststatus就是對這個數組中的每個path都調用上面的參數為path的liststatus。參數中的pathfilter則是一個接口,實作接口的accept方法可以自定義檔案過濾規則。
另外,hdfs還可以通過正規表達式比對檔案名來提取需要的檔案,這個方法是:
public filestatus[] globstatus(path pathpattern) throws ioexception;
參數pathpattern中,可以像正規表達式一樣,使用通配符來表示比對規則:
?:表示任意的單個字元。
:表示任意長度的任意字元,可以用來表示字首字尾,比如.java表示所有java檔案。
[abc]:表示比對a,b,c中的單個字元。
[a-b]:表示比對a-b範圍之間的單個字元。
c:表示取消特殊字元的轉義,比如*的結果是*而不是随意比對。
{ab,cd}:表示比對ab或者cd中的一個串。
{ab,c{de,fh}}:表示比對ab或者cde或者cfh中的一個串
5.?讀取
3.3.2節提到,調用open打開檔案之後,使用了一個fsdatainputstream對象來負責資料的讀取。通過fsdatainputstream進行檔案讀取時,提供的api就是fsdatainputstream.read方法:
public int read(long?position, byte[] buffer, int?offset, int?length) throws ioexception
函數的意義是:從檔案的指定位置position開始,讀取最多length位元組的資料,儲存到buffer中從offset個元素開始的空間中;傳回值為實際讀取的位元組數。此函數不改變檔案目前offset值。不過,使用更多的還有一種簡化版本:
public f?inal int read(byte[]?b)throws ioexception
從檔案目前位置讀取最多長度為b.len的資料儲存到b中,傳回值為實際讀取的位元組數。
6.?寫入
從接口定義可以看出,調用create建立檔案以後,使用了一個fsdataoutputstream對象來負責資料的寫入。通過fsdataoutputstream進行檔案寫入時,最常用的api就是write方法:
public void write(byte[]?b,int?off,int?len) throws ioexception
函數的意義是:将b中從off開始的最多len個位元組的資料寫入檔案目前位置。傳回值為實際寫入的位元組數。
7.?關閉
關閉為打開的逆過程,filesystem.close定義如下:
不需要其他操作而關閉檔案。釋放所有持有的鎖。
8.?删除
删除過程filesystem.delete定義如下:
其中各項含義如下:
f:待删除檔案名。
recursive:如果recursive為true,并且f是目錄,那麼會遞歸删除f下所有檔案;如果f是檔案,recursive為true還是false無影響。
另外,類似java中file的接口deleteonexit,如果某些檔案需要删除,但是目前不能被删;或者說當時删除代價太大,想留到退出時再删除的話,filesystem中也提供了一個deleteonexit接口:
public boolean deleteonexit(path?f) throws ioexception
标記檔案f,當檔案系統關閉時才真正删除此檔案。但是這個檔案f在檔案系統關閉前必須存在。
3.5.3 hdfs基本程式設計執行個體
本節介紹使用hdfs的api程式設計的簡單示例。
下面的程式可以實作如下功能:在輸入檔案目錄下的所有檔案中,檢索某一特定字元串所出現的行,将這些行的内容輸出到本地檔案系統的輸出檔案夾中。這一功能在分析mapreduce作業的reduce輸出時很有用。
這個程式假定隻有第一層目錄下的檔案才有效,而且,假定檔案都是文本檔案。當然,如果輸入檔案夾是reduce結果的輸出,那麼一般情況下,上述條件都能滿足。為了防止單個的輸出檔案過大,這裡還加了一個檔案最大行數限制,當檔案行數達到最大值時,便關閉此檔案,建立另外的檔案繼續儲存。儲存的結果檔案名為1,2,3,4,…,以此類推。
如上所述,這個程式可以用來分析mapreduce的結果,是以稱為resultfilter。
程式:result filter
輸入參數:此程式接收4個指令行輸入參數,參數含義如下:
程式的編譯指令:
javac *.java
運作指令:
參數和含義如下:
:hdfs上的路徑
:本地路徑
:待查找的字元串
:結果的每個檔案的行數
上述程式的邏輯很簡單,擷取該目錄下所有檔案的資訊,對每一個檔案,打開檔案、循環讀取資料、寫入目标位置,然後關閉檔案,最後關閉輸出檔案。這裡粗體列印的幾個函數上面都有介紹,不再贅述。
我們在自己機器上預裝的hadoop-1.0.4上簡單試驗了這個程式,在hadoop源碼中拷貝了幾個檔案,然後上傳到hdfs中,檔案如下(見圖3-17):
然後,編譯運作一下該示例程式,顯示一下目标檔案内容,結果如圖3-18所示,其中,将出現“java”字元串的每一行都輸出到檔案中。