在使用python做大資料和機器學習處理過程中,首先需要讀取hdfs資料,對于常用格式資料一般比較容易讀取,parquet略微特殊。從hdfs上使用python擷取parquet格式資料的方法(當然也可以先把檔案拉到本地再讀取也可以):
1、安裝anaconda環境。
2、安裝hdfs3。
conda install hdfs3
3、安裝fastparquet。
conda install fastparquet
4、安裝python-snappy。
conda install python-snappy
5、讀取檔案
##namenode mode:
from hdfs3 import HDFileSystem
from fastparquet import ParquetFile
hdfs = HDFileSystem(host=IP, port=8020)
sc = hdfs.open
pf = ParquetFile(filename, open_with=sc)
df = pf.to_pandas()
##傳回pandas的DataFrame類型
##HA mode:
from hdfs3 import HDFileSystem
from fastparquet import ParquetFile
host = "nameservice1"
conf = {
"dfs.nameservices":"nameservice1",
......
}
hdfs = HDFileSystem(host = host, pars = conf)
......
python通路HDFS HA的三種方法
python通路hdfs常用的包有三個,如下:
1、hdfs3
其實從安裝便捷性和使用上來說,并不推薦hdfs3,因為他的系統依賴和網絡要求較高,但是某些情況下使用hdfs3會比較友善,官網資料點這裡。如上面介紹,IP直接通路namenode:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=namenode, port=8020)
hdfs.ls('/tmp')
HA通路:
host = "nameservice1"
conf = {"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "namenode113,namenode188",
"dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020",
"dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020",
"dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070",
"dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070",
"hadoop.security.authentication": "kerberos"
}
fs = HDFileSystem(host=host, pars=conf)
##或者下面這種配置
host = "ns1"
conf = {
"dfs.nameservices":"ns1",
"dfs.ha.namenodes.ns1":"namenode122,namenode115",
"dfs.namenode.rpc-address.ns1.namenode122":"nnlab01:8020",
"dfs.namenode.servicerpc-address.ns1.namenode122":"nnlab01:8022",
"dfs.namenode.http-address.ns1.namenode122":"nnlab01:50070",
"dfs.namenode.https-address.ns1.namenode122":"nnlab01:50470",
"dfs.namenode.rpc-address.ns1.namenode115":"nnlab02:8020",
"dfs.namenode.servicerpc-address.ns1.namenode115":"nnlab02:8022",
"dfs.namenode.http-address.ns1.namenode115":"nnlab02:50070",
"dfs.namenode.https-address.ns1.namenode115":"nnlab02:50470",
}
hdfs = HDFileSystem(host = host, pars = conf)
2、hdfs
這種方法在使用的時候配置比較簡單,官網資料也比較豐富,但是需要注意的是該API可以模拟使用者通路,權限較大。IP直接通路:
import hdfs
client = hdfs.client.InsecureClient(url="http://namenode:50070", user="hdfs")
client = hdfs.client.InsecureClient(url="http://namenode1:50070;http://namenode2:50070", user="hdfs")
3、pyhdfs
安裝指令:pip install PyHDFS
官網位址,直接通路:
import pyhdfs
client = pyhdfs.HdfsClient(hosts="namenode:50070",user_name="hdfs")
HA通路
client = pyhdfs.HdfsClient(hosts=["namenode1:50070","namenode2:50070"],user_name="hdfs")
補充知識:python spark中parquet檔案寫到hdfs,同時避免太多的小檔案(block小檔案合并)
在pyspark中,使用資料框的檔案寫出函數write.parquet經常會生成太多的小檔案,例如申請了100個block,而每個block中的結果
隻有幾百K,這在機器學習算法的結果輸出中經常出現,這是一種很大的資源浪費,那麼如何同時避免太多的小檔案(block小檔案合并)?
其實有一種簡單方法,該方法需要你對輸出結果的資料量有個大概估計,然後使用Dataframe中的coalesce函數來指定輸出的block數量
即可,具體使用代碼如下:
df.coalesce(2).write.parquet(path,mode)
這裡df是指你要寫出的資料框,coalesce(2)指定了寫到2個block中,一個block預設128M,path是你的寫出路徑,mode是寫出模式,常用的是