通过Lambda层,Step函数以及通过AWS Athena进行进一步的数据分析,从Elasticsearch到S3上的Apache Parquet文件的无服务器大规模数据提取
特征提取和降维是机器学习数据准备中常见的基本任务。 在我们的大多数(Kubernetes)项目中,我们使用通用的EFK堆栈(Elasticsearch + Fluentd + Kibana),该堆栈可用于临时故障排除,但缺乏大规模的数据分析功能。 为了深入了解更大的数据集,需要使用不同的方法。 在这里,我探索一种方法,其中将提取的Elasticsearch数据(25m条或更多记录)作为列数据文件存储在S3中,并使用AWS Athena执行以更快的响应时间进行切片和切块。
列数据格式是BI和数据分析工作负载的关键。 它们有助于极大地降低总体磁盘I / O要求,并减少要从磁盘加载的数据量,因此有助于优化分析查询性能。 最常用的格式是Apache Parquet或Apache ORC。 在本文中,我研究了如何大规模提取Elasticsearch数据以进行进一步的分析和提取。
目录
· 通过Lambda层,Step函数以及通过AWS Athena进行进一步的数据分析,从Elasticsearch到S3上的Apache Parquet文件的无服务器大规模数据提取
· 通过滚动Elasticsearch批量导出
· 作为步进功能执行
· 数据存储为列式文件
· 将所有依赖项打包为Lambda层
· 通过AWS Athena进行数据访问
· 结论
· 附录
· Snappy Lambda运行时问题
· Fastparquet安装问题
通过滚动Elasticsearch批量导出
Elasticsearch具有一个基于http的API,可以从搜索请求中检索大量结果,其方式与在传统数据库中使用游标的方式几乎相同。 初始调用在查询字符串中包含滚动参数,该参数告诉Elasticsearch应该保持"搜索上下文"有效的时间。
from elasticsearch import Elasticsearches = Elasticsearch("http://myserver:13000/elasticsearch")page = es.search( index = myindex, scroll = '2m', size = 10000, body = myquery)sid = page['_scroll_id']scroll_size = page['hits']['total']
scroll参数(传递给搜索请求,然后传递给每个滚动请求)告诉Elasticsearch它应该保持搜索上下文存活多长时间,并且需要足够长的时间才能在下一个滚动请求之前处理这批结果。 现在可以迭代和检索与查询关联的数据:
while (scroll_size > 0): print("Scrolling...", i) page = es.scroll(scroll_id = sid, scroll = '2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print("scroll size: " + str(scroll_size)) # Convert the page into a Pandas dataframe dfES = Select.from_dict(page).to_pandas() dfES.drop(columns=['_id', '_index', '_score', '_type'], inplace=True)
应当注意,大小的最大值(每批结果将返回的命中数)是10000。在我的情况下,这是一个25m记录的数据集,因此需要进行2500次滚动提取批次。 为了减少迭代次数,在Lambda函数中,一次调用期间执行15个滚动事件,这将产生150,000条记录。 连同对结果的进一步处理,每次Lambda调用大约需要60秒,因此整个处理时间大约为3小时。 (如果数据集更大,则可以在Elasticsearch集群的不同节点上并行执行切片滚动)。 整体执行作业作为步进功能运行,每个提取批处理执行一次Lambda执行。
作为步进功能执行
要自动执行滚动Lambda直到提取出最后一页,Step Function是最佳选择。
在此检查Lambda的输出,如果它指示仍然有其他页面可用,则再次执行Lambda函数。 可以通过Cloudwatch事件触发Step功能,方法是在给定时间指示"状态":"开始",并组织所有数据的提取,两次运行之间传递的数据/事件如下:
{
"state": "Scroll",
"scrollId": "DnF1ZXJ5VGhlbkZldGNoBQA2VjRfUUJRAAAAALckuFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JLxZGOW0tSGw4c1JOS1BZVzI2VjRfUUJRAAAAAAAAApAWbGtqLUVOQWVSZm03NmdxTllqNFhZZwAAAAAALckxFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JMBZGOW0tSGw4c1JOS1BZVzI",
"index": 12879232
}
可能的状态为开始,滚动和完成。 状态函数定义如下:
{ "StartAt": "ScrollAgain?", "States": { "ScrollAgain?": { "Type" : "Choice", "Choices": [ { "Variable": "$.state", "StringEquals": "Done", "Next": "FinalState" } ], "Default": "Scrolling" },"Scrolling": { "Type" : "Task", "Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxx:function:MyParquetfunction", "Next": "ScrollAgain?" },"FinalState": { "Type": "Succeed" } }}
数据存储为列式文件
在许多分析用例中,提取的数据可以存储为CSV文件,以进行进一步处理。 但是,由于数据集很大,因此这种方法实际上不是可行的选择。
将数据提取存储在多个数据文件中,特别是作为列存储文件存储,具有多个优点,尤其是在将数据与Athena一起使用时。 与基于行的文件(如CSV)相比,Apache Parquet面向列,旨在提供高效的列式数据存储。 它具有字典编码,位打包和游程长度编码(RLE)等功能来压缩数据。 Twitter工程团队的博客文章"使用Parquet使Dremel变得简单"给出了更详细的概述。
可以对镶木地板文件应用其他压缩,甚至可以对每列应用不同的压缩算法。
Snappy是一种轻量级且快速压缩的编解码器,不需要太多的CPU使用率,但压缩效果不如gzip或bzip2那样好。 在这种情况下,应用快速压缩会将文件大小减少5倍,并且比相同的文件gzip压缩大2倍。
在Python中,编写镶木地板文件并将上传内容集成到S3中非常容易:
import s3fsimport fastparquet as fpimport pandas as pdimport numpy as nps3 = s3fs.S3FileSystem()myopen = s3.opens3bucket = 'mydata-aws-bucket/'# random dataframe for demodf = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))parqKey = s3bucket + "datafile" + ".parq.snappy"fp.write(parqKey, df ,compression='SNAPPY', open_with=myopen)
Pandas和Numpy应该被称为标准Python数据科学库。 Fastparquet是Parquet文件格式的接口,该文件格式使用Numba Python到LLVM编译器来提高速度。 它是Dask项目的一个分叉,来自Joe Crobak的python-parquet的原始实现。 S3Fs是S3的Pythonic文件接口,它建立在boto3之上。
将所有依赖项打包为Lambda层
通过压缩包打包了我上一篇博客文章"使用AWS Lambda和Layers进行纯无服务器机器推理的纯文本",将要上传的依赖关系打包。 参见下文,必须相应地设置LD_LIBRARY_PATH,以使Lambda函数访问该层中已编译的Snappy库。 同样使用Pip和附加目标值进行构建会导致fastparquet安装出现一些问题,下面提供了更多详细信息。
rm -rf python && mkdir -p python
docker run --rm -v $(pwd):/foo -w /foo lambci/lambda:build-python3.7 /foo/build.sh
清除"外部"目标目录,然后在容器内执行与AWS运行时匹配的构建。 build.sh如下所示:
#!/usr/bin/env bash
export PYTHONPATH=$PYTHONPATH:/foo/python
yum install snappy-devel -y
pip install --upgrade pip
pip install -r requirements.txt --no-binary --no-dependencies --no-cache-dir -t python
pip install fastparquet==0.2.1 --no-binary --no-dependencies --no-cache-dir -t python
cp /usr/lib64/libsnappy* /foo/lib64/
如前所述,由于使用了-t python target参数,因此必须从requirements.txt文件中手动删除fastparquet模块并进行第二步安装。 然后可以像下面这样部署该层:
zip -r MyParquet_layer.zip python lib64
aws s3 cp MyParquet_layer.zip s3://mybucket/layers/
aws lambda publish-layer-version --layer-name MyParquet_layer --content S3Bucket=mybucket,S3Key=layers/MyParquet_layer.zip --compatible-runtimes python3.7
通过AWS Athena进行数据访问
为了进一步处理数据并提取一些子集,AWS Athena是完美的工具。 AWS Athena是一种无服务器交互式查询服务,可轻松使用标准SQL在S3中分析大量数据。 它开箱即用地处理诸如Parquet或ORC的列状和压缩数据。
可以通过以下命令轻松创建S3中数据的Athena表:
CREATE EXTERNAL TABLE IF NOT EXISTS myanalytics_parquet (
`column1` string,
`column2` int,
`column3` DOUBLE,
`column4` int,
`column5` string
)
STORED AS PARQUET
LOCATION 's3://mybucket/parquetdata/'
tblproperties ("parquet.compress"="SNAPPY")
此后,数据已经可用,并且可以以惊人的速度和响应时间进行查询。
SELECT column5, count(*) AS cnt FROM myanalytics_parquetGROUP BY column5ORDER BY cnt DESC;
这样的查询只需要几秒钟即可对完整的数据集执行。 结果数据在分配给Athena实例的默认S3存储桶中可用,并且可以提取为CSV文件。
结论
从Elasticsearch中批量提取数据并将其转换为压缩的Parquet文件(存储在S3中)是处理大型数据集的有效方法。 将AWS Athena与这些文件一起使用,可以轻松快速,高效地对数据集进行切片和切块,并允许取出定义的数据子集(降维)以进行进一步处理或特征提取。 与使用Elasticsearch中的原始数据集相比,这无疑是一种更快,更有效的方法。 可以通过滚动功能来完成从Elasticsearch中获取数据的操作,并使用step函数和lambda运行它是一种有效的无服务器解决方案。
附录
将snappy和fastparquet安装到lambda函数中并不是那么容易,因为它们都使用C扩展名和库。
Snappy Lambda运行时问题
关于快速安装,我遇到了以下问题:
ModuleNotFoundError: No module named 'snappy._snappy_cffi'
不幸的是,这是一个掩盖的错误,最初的异常是由于找不到libsnappy.so库而发生的,然后又发生了另一个异常,如上所示,这最初使我走错了路。 这里的问题是Lambda图层都映射在/ opt目录下。 在aws-lambda-container-image-converter存储库中未检测到/ opt下的此问题报告库#12,该线索表明必须扩展LD_LIBRARY_PATH并将其设置为Lambda函数,例如:
$LAMBDA_TASK_ROOT/lib:$LAMBDA_TASK_ROOT/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:/opt/lib:/opt/lib64:/lib64:/usr/lib64
然后将共享库放在可以找到共享库的lib64文件夹中。
Fastparquet安装问题
快速镶木地板的安装也造成了一些麻烦。 如果使用目标参数执行Pip命令,则fastparquet构建无法找到必要的numpy头文件。 解决此问题的一种方法是,从require.txt文件中删除fastparquet并将其安装为第二步。 如果正确设置了PYTHONPATH,则可以找到numpy标头。
(本文翻译自Klaus Seiler的文章《Effective data exploration via columnar data formats like Parquet》,参考:https://medium.com/merapar/effective-data-exploration-via-columnar-data-formats-like-parquet-652466676188)