天天看點

matlab提取gps格式有效資料_通過像Parquet這樣的列資料格式進行有效的資料探索目錄通過滾動Elasticsearch批量導出作為步進功能執行資料存儲為列式檔案将所有依賴項打包為Lambda層通過AWS Athena進行資料通路結論附錄Snappy Lambda運作時問題Fastparquet安裝問題

通過Lambda層,Step函數以及通過AWS Athena進行進一步的資料分析,從Elasticsearch到S3上的Apache Parquet檔案的無伺服器大規模資料提取

特征提取和降維是機器學習資料準備中常見的基本任務。 在我們的大多數(Kubernetes)項目中,我們使用通用的EFK堆棧(Elasticsearch + Fluentd + Kibana),該堆棧可用于臨時故障排除,但缺乏大規模的資料分析功能。 為了深入了解更大的資料集,需要使用不同的方法。 在這裡,我探索一種方法,其中将提取的Elasticsearch資料(25m條或更多記錄)作為列資料檔案存儲在S3中,并使用AWS Athena執行以更快的響應時間進行切片和切塊。

列資料格式是BI和資料分析工作負載的關鍵。 它們有助于極大地降低總體磁盤I / O要求,并減少要從磁盤加載的資料量,是以有助于優化分析查詢性能。 最常用的格式是Apache Parquet或Apache ORC。 在本文中,我研究了如何大規模提取Elasticsearch資料以進行進一步的分析和提取。

matlab提取gps格式有效資料_通過像Parquet這樣的列資料格式進行有效的資料探索目錄通過滾動Elasticsearch批量導出作為步進功能執行資料存儲為列式檔案将所有依賴項打包為Lambda層通過AWS Athena進行資料通路結論附錄Snappy Lambda運作時問題Fastparquet安裝問題

目錄

· 通過Lambda層,Step函數以及通過AWS Athena進行進一步的資料分析,從Elasticsearch到S3上的Apache Parquet檔案的無伺服器大規模資料提取

· 通過滾動Elasticsearch批量導出

· 作為步進功能執行

· 資料存儲為列式檔案

· 将所有依賴項打包為Lambda層

· 通過AWS Athena進行資料通路

· 結論

· 附錄

· Snappy Lambda運作時問題

· Fastparquet安裝問題

通過滾動Elasticsearch批量導出

Elasticsearch具有一個基于http的API,可以從搜尋請求中檢索大量結果,其方式與在傳統資料庫中使用遊标的方式幾乎相同。 初始調用在查詢字元串中包含滾動參數,該參數告訴Elasticsearch應該保持"搜尋上下文"有效的時間。

from elasticsearch import Elasticsearches = Elasticsearch("http://myserver:13000/elasticsearch")page = es.search(    index = myindex,    scroll = '2m',    size = 10000,    body = myquery)sid = page['_scroll_id']scroll_size = page['hits']['total']
           

scroll參數(傳遞給搜尋請求,然後傳遞給每個滾動請求)告訴Elasticsearch它應該保持搜尋上下文存活多長時間,并且需要足夠長的時間才能在下一個滾動請求之前處理這批結果。 現在可以疊代和檢索與查詢關聯的資料:

while (scroll_size > 0):    print("Scrolling...", i)    page = es.scroll(scroll_id = sid, scroll = '2m')    # Update the scroll ID    sid = page['_scroll_id']    # Get the number of results that we returned in the last scroll    scroll_size = len(page['hits']['hits'])    print("scroll size: " + str(scroll_size))    # Convert the page into a Pandas dataframe    dfES = Select.from_dict(page).to_pandas()    dfES.drop(columns=['_id', '_index', '_score', '_type'],          inplace=True)
           

應當注意,大小的最大值(每批結果将傳回的命中數)是10000。在我的情況下,這是一個25m記錄的資料集,是以需要進行2500次滾動提取批次。 為了減少疊代次數,在Lambda函數中,一次調用期間執行15個滾動事件,這将産生150,000條記錄。 連同對結果的進一步處理,每次Lambda調用大約需要60秒,是以整個處理時間大約為3小時。 (如果資料集更大,則可以在Elasticsearch叢集的不同節點上并行執行切片滾動)。 整體執行作業作為步進功能運作,每個提取批處理執行一次Lambda執行。

作為步進功能執行

要自動執行滾動Lambda直到提取出最後一頁,Step Function是最佳選擇。

matlab提取gps格式有效資料_通過像Parquet這樣的列資料格式進行有效的資料探索目錄通過滾動Elasticsearch批量導出作為步進功能執行資料存儲為列式檔案将所有依賴項打包為Lambda層通過AWS Athena進行資料通路結論附錄Snappy Lambda運作時問題Fastparquet安裝問題

在此檢查Lambda的輸出,如果它訓示仍然有其他頁面可用,則再次執行Lambda函數。 可以通過Cloudwatch事件觸發Step功能,方法是在給定時間訓示"狀态":"開始",并組織所有資料的提取,兩次運作之間傳遞的資料/事件如下:

{

"state": "Scroll",

"scrollId": "DnF1ZXJ5VGhlbkZldGNoBQA2VjRfUUJRAAAAALckuFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JLxZGOW0tSGw4c1JOS1BZVzI2VjRfUUJRAAAAAAAAApAWbGtqLUVOQWVSZm03NmdxTllqNFhZZwAAAAAALckxFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JMBZGOW0tSGw4c1JOS1BZVzI",

"index": 12879232

}

可能的狀态為開始,滾動和完成。 狀态函數定義如下:

{  "StartAt": "ScrollAgain?",  "States": {    "ScrollAgain?": {      "Type" : "Choice",      "Choices": [        {          "Variable": "$.state",          "StringEquals": "Done",          "Next": "FinalState"        }      ],      "Default": "Scrolling"    },"Scrolling": {      "Type" : "Task",      "Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxx:function:MyParquetfunction",      "Next": "ScrollAgain?"    },"FinalState": {      "Type": "Succeed"    }  }}
           

資料存儲為列式檔案

在許多分析用例中,提取的資料可以存儲為CSV檔案,以進行進一步處理。 但是,由于資料集很大,是以這種方法實際上不是可行的選擇。

将資料提取存儲在多個資料檔案中,特别是作為列存儲檔案存儲,具有多個優點,尤其是在将資料與Athena一起使用時。 與基于行的檔案(如CSV)相比,Apache Parquet面向列,旨在提供高效的列式資料存儲。 它具有字典編碼,位打包和遊程長度編碼(RLE)等功能來壓縮資料。 Twitter工程團隊的部落格文章"使用Parquet使Dremel變得簡單"給出了更詳細的概述。

可以對鑲木地闆檔案應用其他壓縮,甚至可以對每列應用不同的壓縮算法。

Snappy是一種輕量級且快速壓縮的編解碼器,不需要太多的CPU使用率,但壓縮效果不如gzip或bzip2那樣好。 在這種情況下,應用快速壓縮會将檔案大小減少5倍,并且比相同的檔案gzip壓縮大2倍。

在Python中,編寫鑲木地闆檔案并将上傳内容內建到S3中非常容易:

import s3fsimport fastparquet as fpimport pandas as pdimport numpy as nps3 = s3fs.S3FileSystem()myopen = s3.opens3bucket = 'mydata-aws-bucket/'# random dataframe for demodf = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))parqKey = s3bucket + "datafile"  + ".parq.snappy"fp.write(parqKey, df ,compression='SNAPPY', open_with=myopen)
           

Pandas和Numpy應該被稱為标準Python資料科學庫。 Fastparquet是Parquet檔案格式的接口,該檔案格式使用Numba Python到LLVM編譯器來提高速度。 它是Dask項目的一個分叉,來自Joe Crobak的python-parquet的原始實作。 S3Fs是S3的Pythonic檔案接口,它建立在boto3之上。

将所有依賴項打包為Lambda層

通過壓縮包打包了我上一篇部落格文章"使用AWS Lambda和Layers進行純無伺服器機器推理的純文字",将要上傳的依賴關系打包。 參見下文,必須相應地設定LD_LIBRARY_PATH,以使Lambda函數通路該層中已編譯的Snappy庫。 同樣使用Pip和附加目标值進行建構會導緻fastparquet安裝出現一些問題,下面提供了更多詳細資訊。

rm -rf python && mkdir -p python

docker run --rm -v $(pwd):/foo -w /foo lambci/lambda:build-python3.7 /foo/build.sh

清除"外部"目标目錄,然後在容器内執行與AWS運作時比對的建構。 build.sh如下所示:

#!/usr/bin/env bash

export PYTHONPATH=$PYTHONPATH:/foo/python

yum install snappy-devel -y

pip install --upgrade pip

pip install -r requirements.txt --no-binary --no-dependencies --no-cache-dir -t python

pip install fastparquet==0.2.1 --no-binary --no-dependencies --no-cache-dir -t python

cp /usr/lib64/libsnappy* /foo/lib64/

如前所述,由于使用了-t python target參數,是以必須從requirements.txt檔案中手動删除fastparquet子產品并進行第二步安裝。 然後可以像下面這樣部署該層:

zip -r MyParquet_layer.zip python lib64

aws s3 cp MyParquet_layer.zip s3://mybucket/layers/

aws lambda publish-layer-version --layer-name MyParquet_layer --content S3Bucket=mybucket,S3Key=layers/MyParquet_layer.zip --compatible-runtimes python3.7

通過AWS Athena進行資料通路

為了進一步處理資料并提取一些子集,AWS Athena是完美的工具。 AWS Athena是一種無伺服器互動式查詢服務,可輕松使用标準SQL在S3中分析大量資料。 它開箱即用地處理諸如Parquet或ORC的列狀和壓縮資料。

可以通過以下指令輕松建立S3中資料的Athena表:

CREATE EXTERNAL TABLE IF NOT EXISTS myanalytics_parquet (

`column1` string,

`column2` int,

`column3` DOUBLE,

`column4` int,

`column5` string

)

STORED AS PARQUET

LOCATION 's3://mybucket/parquetdata/'

tblproperties ("parquet.compress"="SNAPPY")

此後,資料已經可用,并且可以以驚人的速度和響應時間進行查詢。

SELECT column5, count(*) AS cnt FROM myanalytics_parquetGROUP BY column5ORDER BY cnt DESC;
           

這樣的查詢隻需要幾秒鐘即可對完整的資料集執行。 結果資料在配置設定給Athena執行個體的預設S3存儲桶中可用,并且可以提取為CSV檔案。

結論

從Elasticsearch中批量提取資料并将其轉換為壓縮的Parquet檔案(存儲在S3中)是處理大型資料集的有效方法。 将AWS Athena與這些檔案一起使用,可以輕松快速,高效地對資料集進行切片和切塊,并允許取出定義的資料子集(降維)以進行進一步處理或特征提取。 與使用Elasticsearch中的原始資料集相比,這無疑是一種更快,更有效的方法。 可以通過滾動功能來完成從Elasticsearch中擷取資料的操作,并使用step函數和lambda運作它是一種有效的無伺服器解決方案。

附錄

将snappy和fastparquet安裝到lambda函數中并不是那麼容易,因為它們都使用C擴充名和庫。

Snappy Lambda運作時問題

關于快速安裝,我遇到了以下問題:

ModuleNotFoundError: No module named 'snappy._snappy_cffi'

不幸的是,這是一個掩蓋的錯誤,最初的異常是由于找不到libsnappy.so庫而發生的,然後又發生了另一個異常,如上所示,這最初使我走錯了路。 這裡的問題是Lambda圖層都映射在/ opt目錄下。 在aws-lambda-container-image-converter存儲庫中未檢測到/ opt下的此問題報告庫#12,該線索表明必須擴充LD_LIBRARY_PATH并将其設定為Lambda函數,例如:

$LAMBDA_TASK_ROOT/lib:$LAMBDA_TASK_ROOT/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:/opt/lib:/opt/lib64:/lib64:/usr/lib64

然後将共享庫放在可以找到共享庫的lib64檔案夾中。

Fastparquet安裝問題

快速鑲木地闆的安裝也造成了一些麻煩。 如果使用目标參數執行Pip指令,則fastparquet建構無法找到必要的numpy頭檔案。 解決此問題的一種方法是,從require.txt檔案中删除fastparquet并将其安裝為第二步。 如果正确設定了PYTHONPATH,則可以找到numpy标頭。

(本文翻譯自Klaus Seiler的文章《Effective data exploration via columnar data formats like Parquet》,參考:https://medium.com/merapar/effective-data-exploration-via-columnar-data-formats-like-parquet-652466676188)