天天看點

python讀取hdfs上的parquet檔案方式

在使用python做大資料和機器學習處理過程中,首先需要讀取hdfs資料,對于常用格式資料一般比較容易讀取,parquet略微特殊。從hdfs上使用python擷取parquet格式資料的方法(當然也可以先把檔案拉到本地再讀取也可以):

1、安裝anaconda環境。

2、安裝hdfs3。

conda install hdfs3

3、安裝fastparquet。

conda install fastparquet

4、安裝python-snappy。

conda install python-snappy

5、讀取檔案

##namenode mode:
from hdfs3 import HDFileSystem
from fastparquet import ParquetFile
  
hdfs = HDFileSystem(host=IP, port=8020)
sc = hdfs.open
  
pf = ParquetFile(filename, open_with=sc)
df = pf.to_pandas()
  
##傳回pandas的DataFrame類型
  
##HA mode:
from hdfs3 import HDFileSystem
from fastparquet import ParquetFile
  
host = "nameservice1"
conf = {
 "dfs.nameservices":"nameservice1",
 ......
}
hdfs = HDFileSystem(host = host, pars = conf)
......      

python通路HDFS HA的三種方法

python通路hdfs常用的包有三個,如下:

1、hdfs3

其實從安裝便捷性和使用上來說,并不推薦hdfs3,因為他的系統依賴和網絡要求較高,但是某些情況下使用hdfs3會比較友善,官網資料點這裡。如上面介紹,IP直接通路namenode:

from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=namenode, port=8020)
hdfs.ls('/tmp')      

HA通路:

host = "nameservice1"
conf = {"dfs.nameservices": "nameservice1",
 "dfs.ha.namenodes.nameservice1": "namenode113,namenode188",
 "dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020",
 "dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020",
 "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070",
 "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070",
 "hadoop.security.authentication": "kerberos"
}
fs = HDFileSystem(host=host, pars=conf)
  
##或者下面這種配置
host = "ns1"
conf = {
 "dfs.nameservices":"ns1",
 "dfs.ha.namenodes.ns1":"namenode122,namenode115",
 "dfs.namenode.rpc-address.ns1.namenode122":"nnlab01:8020",
 "dfs.namenode.servicerpc-address.ns1.namenode122":"nnlab01:8022",
 "dfs.namenode.http-address.ns1.namenode122":"nnlab01:50070",
 "dfs.namenode.https-address.ns1.namenode122":"nnlab01:50470",
 "dfs.namenode.rpc-address.ns1.namenode115":"nnlab02:8020",
 "dfs.namenode.servicerpc-address.ns1.namenode115":"nnlab02:8022",
 "dfs.namenode.http-address.ns1.namenode115":"nnlab02:50070",
 "dfs.namenode.https-address.ns1.namenode115":"nnlab02:50470",
}
hdfs = HDFileSystem(host = host, pars = conf)      

2、hdfs

這種方法在使用的時候配置比較簡單,官網資料也比較豐富,但是需要注意的是該API可以模拟使用者通路,權限較大。IP直接通路:

import hdfs

client = hdfs.client.InsecureClient(url="http://namenode:50070", user="hdfs")

client = hdfs.client.InsecureClient(url="http://namenode1:50070;http://namenode2:50070", user="hdfs")

3、pyhdfs

安裝指令:pip install PyHDFS

官網位址,直接通路:

import pyhdfs

client = pyhdfs.HdfsClient(hosts="namenode:50070",user_name="hdfs")

HA通路

client = pyhdfs.HdfsClient(hosts=["namenode1:50070","namenode2:50070"],user_name="hdfs")

補充知識:python spark中parquet檔案寫到hdfs,同時避免太多的小檔案(block小檔案合并)

在pyspark中,使用資料框的檔案寫出函數write.parquet經常會生成太多的小檔案,例如申請了100個block,而每個block中的結果

隻有幾百K,這在機器學習算法的結果輸出中經常出現,這是一種很大的資源浪費,那麼如何同時避免太多的小檔案(block小檔案合并)?

其實有一種簡單方法,該方法需要你對輸出結果的資料量有個大概估計,然後使用Dataframe中的coalesce函數來指定輸出的block數量

即可,具體使用代碼如下:

df.coalesce(2).write.parquet(path,mode)

這裡df是指你要寫出的資料框,coalesce(2)指定了寫到2個block中,一個block預設128M,path是你的寫出路徑,mode是寫出模式,常用的是