通過Lambda層,Step函數以及通過AWS Athena進行進一步的資料分析,從Elasticsearch到S3上的Apache Parquet檔案的無伺服器大規模資料提取
特征提取和降維是機器學習資料準備中常見的基本任務。 在我們的大多數(Kubernetes)項目中,我們使用通用的EFK堆棧(Elasticsearch + Fluentd + Kibana),該堆棧可用于臨時故障排除,但缺乏大規模的資料分析功能。 為了深入了解更大的資料集,需要使用不同的方法。 在這裡,我探索一種方法,其中将提取的Elasticsearch資料(25m條或更多記錄)作為列資料檔案存儲在S3中,并使用AWS Athena執行以更快的響應時間進行切片和切塊。
列資料格式是BI和資料分析工作負載的關鍵。 它們有助于極大地降低總體磁盤I / O要求,并減少要從磁盤加載的資料量,是以有助于優化分析查詢性能。 最常用的格式是Apache Parquet或Apache ORC。 在本文中,我研究了如何大規模提取Elasticsearch資料以進行進一步的分析和提取。
目錄
· 通過Lambda層,Step函數以及通過AWS Athena進行進一步的資料分析,從Elasticsearch到S3上的Apache Parquet檔案的無伺服器大規模資料提取
· 通過滾動Elasticsearch批量導出
· 作為步進功能執行
· 資料存儲為列式檔案
· 将所有依賴項打包為Lambda層
· 通過AWS Athena進行資料通路
· 結論
· 附錄
· Snappy Lambda運作時問題
· Fastparquet安裝問題
通過滾動Elasticsearch批量導出
Elasticsearch具有一個基于http的API,可以從搜尋請求中檢索大量結果,其方式與在傳統資料庫中使用遊标的方式幾乎相同。 初始調用在查詢字元串中包含滾動參數,該參數告訴Elasticsearch應該保持"搜尋上下文"有效的時間。
from elasticsearch import Elasticsearches = Elasticsearch("http://myserver:13000/elasticsearch")page = es.search( index = myindex, scroll = '2m', size = 10000, body = myquery)sid = page['_scroll_id']scroll_size = page['hits']['total']
scroll參數(傳遞給搜尋請求,然後傳遞給每個滾動請求)告訴Elasticsearch它應該保持搜尋上下文存活多長時間,并且需要足夠長的時間才能在下一個滾動請求之前處理這批結果。 現在可以疊代和檢索與查詢關聯的資料:
while (scroll_size > 0): print("Scrolling...", i) page = es.scroll(scroll_id = sid, scroll = '2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print("scroll size: " + str(scroll_size)) # Convert the page into a Pandas dataframe dfES = Select.from_dict(page).to_pandas() dfES.drop(columns=['_id', '_index', '_score', '_type'], inplace=True)
應當注意,大小的最大值(每批結果将傳回的命中數)是10000。在我的情況下,這是一個25m記錄的資料集,是以需要進行2500次滾動提取批次。 為了減少疊代次數,在Lambda函數中,一次調用期間執行15個滾動事件,這将産生150,000條記錄。 連同對結果的進一步處理,每次Lambda調用大約需要60秒,是以整個處理時間大約為3小時。 (如果資料集更大,則可以在Elasticsearch叢集的不同節點上并行執行切片滾動)。 整體執行作業作為步進功能運作,每個提取批處理執行一次Lambda執行。
作為步進功能執行
要自動執行滾動Lambda直到提取出最後一頁,Step Function是最佳選擇。
在此檢查Lambda的輸出,如果它訓示仍然有其他頁面可用,則再次執行Lambda函數。 可以通過Cloudwatch事件觸發Step功能,方法是在給定時間訓示"狀态":"開始",并組織所有資料的提取,兩次運作之間傳遞的資料/事件如下:
{
"state": "Scroll",
"scrollId": "DnF1ZXJ5VGhlbkZldGNoBQA2VjRfUUJRAAAAALckuFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JLxZGOW0tSGw4c1JOS1BZVzI2VjRfUUJRAAAAAAAAApAWbGtqLUVOQWVSZm03NmdxTllqNFhZZwAAAAAALckxFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JMBZGOW0tSGw4c1JOS1BZVzI",
"index": 12879232
}
可能的狀态為開始,滾動和完成。 狀态函數定義如下:
{ "StartAt": "ScrollAgain?", "States": { "ScrollAgain?": { "Type" : "Choice", "Choices": [ { "Variable": "$.state", "StringEquals": "Done", "Next": "FinalState" } ], "Default": "Scrolling" },"Scrolling": { "Type" : "Task", "Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxx:function:MyParquetfunction", "Next": "ScrollAgain?" },"FinalState": { "Type": "Succeed" } }}
資料存儲為列式檔案
在許多分析用例中,提取的資料可以存儲為CSV檔案,以進行進一步處理。 但是,由于資料集很大,是以這種方法實際上不是可行的選擇。
将資料提取存儲在多個資料檔案中,特别是作為列存儲檔案存儲,具有多個優點,尤其是在将資料與Athena一起使用時。 與基于行的檔案(如CSV)相比,Apache Parquet面向列,旨在提供高效的列式資料存儲。 它具有字典編碼,位打包和遊程長度編碼(RLE)等功能來壓縮資料。 Twitter工程團隊的部落格文章"使用Parquet使Dremel變得簡單"給出了更詳細的概述。
可以對鑲木地闆檔案應用其他壓縮,甚至可以對每列應用不同的壓縮算法。
Snappy是一種輕量級且快速壓縮的編解碼器,不需要太多的CPU使用率,但壓縮效果不如gzip或bzip2那樣好。 在這種情況下,應用快速壓縮會将檔案大小減少5倍,并且比相同的檔案gzip壓縮大2倍。
在Python中,編寫鑲木地闆檔案并将上傳内容內建到S3中非常容易:
import s3fsimport fastparquet as fpimport pandas as pdimport numpy as nps3 = s3fs.S3FileSystem()myopen = s3.opens3bucket = 'mydata-aws-bucket/'# random dataframe for demodf = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))parqKey = s3bucket + "datafile" + ".parq.snappy"fp.write(parqKey, df ,compression='SNAPPY', open_with=myopen)
Pandas和Numpy應該被稱為标準Python資料科學庫。 Fastparquet是Parquet檔案格式的接口,該檔案格式使用Numba Python到LLVM編譯器來提高速度。 它是Dask項目的一個分叉,來自Joe Crobak的python-parquet的原始實作。 S3Fs是S3的Pythonic檔案接口,它建立在boto3之上。
将所有依賴項打包為Lambda層
通過壓縮包打包了我上一篇部落格文章"使用AWS Lambda和Layers進行純無伺服器機器推理的純文字",将要上傳的依賴關系打包。 參見下文,必須相應地設定LD_LIBRARY_PATH,以使Lambda函數通路該層中已編譯的Snappy庫。 同樣使用Pip和附加目标值進行建構會導緻fastparquet安裝出現一些問題,下面提供了更多詳細資訊。
rm -rf python && mkdir -p python
docker run --rm -v $(pwd):/foo -w /foo lambci/lambda:build-python3.7 /foo/build.sh
清除"外部"目标目錄,然後在容器内執行與AWS運作時比對的建構。 build.sh如下所示:
#!/usr/bin/env bash
export PYTHONPATH=$PYTHONPATH:/foo/python
yum install snappy-devel -y
pip install --upgrade pip
pip install -r requirements.txt --no-binary --no-dependencies --no-cache-dir -t python
pip install fastparquet==0.2.1 --no-binary --no-dependencies --no-cache-dir -t python
cp /usr/lib64/libsnappy* /foo/lib64/
如前所述,由于使用了-t python target參數,是以必須從requirements.txt檔案中手動删除fastparquet子產品并進行第二步安裝。 然後可以像下面這樣部署該層:
zip -r MyParquet_layer.zip python lib64
aws s3 cp MyParquet_layer.zip s3://mybucket/layers/
aws lambda publish-layer-version --layer-name MyParquet_layer --content S3Bucket=mybucket,S3Key=layers/MyParquet_layer.zip --compatible-runtimes python3.7
通過AWS Athena進行資料通路
為了進一步處理資料并提取一些子集,AWS Athena是完美的工具。 AWS Athena是一種無伺服器互動式查詢服務,可輕松使用标準SQL在S3中分析大量資料。 它開箱即用地處理諸如Parquet或ORC的列狀和壓縮資料。
可以通過以下指令輕松建立S3中資料的Athena表:
CREATE EXTERNAL TABLE IF NOT EXISTS myanalytics_parquet (
`column1` string,
`column2` int,
`column3` DOUBLE,
`column4` int,
`column5` string
)
STORED AS PARQUET
LOCATION 's3://mybucket/parquetdata/'
tblproperties ("parquet.compress"="SNAPPY")
此後,資料已經可用,并且可以以驚人的速度和響應時間進行查詢。
SELECT column5, count(*) AS cnt FROM myanalytics_parquetGROUP BY column5ORDER BY cnt DESC;
這樣的查詢隻需要幾秒鐘即可對完整的資料集執行。 結果資料在配置設定給Athena執行個體的預設S3存儲桶中可用,并且可以提取為CSV檔案。
結論
從Elasticsearch中批量提取資料并将其轉換為壓縮的Parquet檔案(存儲在S3中)是處理大型資料集的有效方法。 将AWS Athena與這些檔案一起使用,可以輕松快速,高效地對資料集進行切片和切塊,并允許取出定義的資料子集(降維)以進行進一步處理或特征提取。 與使用Elasticsearch中的原始資料集相比,這無疑是一種更快,更有效的方法。 可以通過滾動功能來完成從Elasticsearch中擷取資料的操作,并使用step函數和lambda運作它是一種有效的無伺服器解決方案。
附錄
将snappy和fastparquet安裝到lambda函數中并不是那麼容易,因為它們都使用C擴充名和庫。
Snappy Lambda運作時問題
關于快速安裝,我遇到了以下問題:
ModuleNotFoundError: No module named 'snappy._snappy_cffi'
不幸的是,這是一個掩蓋的錯誤,最初的異常是由于找不到libsnappy.so庫而發生的,然後又發生了另一個異常,如上所示,這最初使我走錯了路。 這裡的問題是Lambda圖層都映射在/ opt目錄下。 在aws-lambda-container-image-converter存儲庫中未檢測到/ opt下的此問題報告庫#12,該線索表明必須擴充LD_LIBRARY_PATH并将其設定為Lambda函數,例如:
$LAMBDA_TASK_ROOT/lib:$LAMBDA_TASK_ROOT/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:/opt/lib:/opt/lib64:/lib64:/usr/lib64
然後将共享庫放在可以找到共享庫的lib64檔案夾中。
Fastparquet安裝問題
快速鑲木地闆的安裝也造成了一些麻煩。 如果使用目标參數執行Pip指令,則fastparquet建構無法找到必要的numpy頭檔案。 解決此問題的一種方法是,從require.txt檔案中删除fastparquet并将其安裝為第二步。 如果正确設定了PYTHONPATH,則可以找到numpy标頭。
(本文翻譯自Klaus Seiler的文章《Effective data exploration via columnar data formats like Parquet》,參考:https://medium.com/merapar/effective-data-exploration-via-columnar-data-formats-like-parquet-652466676188)