天天看点

matlab提取gps格式有效数据_通过像Parquet这样的列数据格式进行有效的数据探索目录通过滚动Elasticsearch批量导出作为步进功能执行数据存储为列式文件将所有依赖项打包为Lambda层通过AWS Athena进行数据访问结论附录Snappy Lambda运行时问题Fastparquet安装问题

通过Lambda层,Step函数以及通过AWS Athena进行进一步的数据分析,从Elasticsearch到S3上的Apache Parquet文件的无服务器大规模数据提取

特征提取和降维是机器学习数据准备中常见的基本任务。 在我们的大多数(Kubernetes)项目中,我们使用通用的EFK堆栈(Elasticsearch + Fluentd + Kibana),该堆栈可用于临时故障排除,但缺乏大规模的数据分析功能。 为了深入了解更大的数据集,需要使用不同的方法。 在这里,我探索一种方法,其中将提取的Elasticsearch数据(25m条或更多记录)作为列数据文件存储在S3中,并使用AWS Athena执行以更快的响应时间进行切片和切块。

列数据格式是BI和数据分析工作负载的关键。 它们有助于极大地降低总体磁盘I / O要求,并减少要从磁盘加载的数据量,因此有助于优化分析查询性能。 最常用的格式是Apache Parquet或Apache ORC。 在本文中,我研究了如何大规模提取Elasticsearch数据以进行进一步的分析和提取。

matlab提取gps格式有效数据_通过像Parquet这样的列数据格式进行有效的数据探索目录通过滚动Elasticsearch批量导出作为步进功能执行数据存储为列式文件将所有依赖项打包为Lambda层通过AWS Athena进行数据访问结论附录Snappy Lambda运行时问题Fastparquet安装问题

目录

· 通过Lambda层,Step函数以及通过AWS Athena进行进一步的数据分析,从Elasticsearch到S3上的Apache Parquet文件的无服务器大规模数据提取

· 通过滚动Elasticsearch批量导出

· 作为步进功能执行

· 数据存储为列式文件

· 将所有依赖项打包为Lambda层

· 通过AWS Athena进行数据访问

· 结论

· 附录

· Snappy Lambda运行时问题

· Fastparquet安装问题

通过滚动Elasticsearch批量导出

Elasticsearch具有一个基于http的API,可以从搜索请求中检索大量结果,其方式与在传统数据库中使用游标的方式几乎相同。 初始调用在查询字符串中包含滚动参数,该参数告诉Elasticsearch应该保持"搜索上下文"有效的时间。

from elasticsearch import Elasticsearches = Elasticsearch("http://myserver:13000/elasticsearch")page = es.search(    index = myindex,    scroll = '2m',    size = 10000,    body = myquery)sid = page['_scroll_id']scroll_size = page['hits']['total']
           

scroll参数(传递给搜索请求,然后传递给每个滚动请求)告诉Elasticsearch它应该保持搜索上下文存活多长时间,并且需要足够长的时间才能在下一个滚动请求之前处理这批结果。 现在可以迭代和检索与查询关联的数据:

while (scroll_size > 0):    print("Scrolling...", i)    page = es.scroll(scroll_id = sid, scroll = '2m')    # Update the scroll ID    sid = page['_scroll_id']    # Get the number of results that we returned in the last scroll    scroll_size = len(page['hits']['hits'])    print("scroll size: " + str(scroll_size))    # Convert the page into a Pandas dataframe    dfES = Select.from_dict(page).to_pandas()    dfES.drop(columns=['_id', '_index', '_score', '_type'],          inplace=True)
           

应当注意,大小的最大值(每批结果将返回的命中数)是10000。在我的情况下,这是一个25m记录的数据集,因此需要进行2500次滚动提取批次。 为了减少迭代次数,在Lambda函数中,一次调用期间执行15个滚动事件,这将产生150,000条记录。 连同对结果的进一步处理,每次Lambda调用大约需要60秒,因此整个处理时间大约为3小时。 (如果数据集更大,则可以在Elasticsearch集群的不同节点上并行执行切片滚动)。 整体执行作业作为步进功能运行,每个提取批处理执行一次Lambda执行。

作为步进功能执行

要自动执行滚动Lambda直到提取出最后一页,Step Function是最佳选择。

matlab提取gps格式有效数据_通过像Parquet这样的列数据格式进行有效的数据探索目录通过滚动Elasticsearch批量导出作为步进功能执行数据存储为列式文件将所有依赖项打包为Lambda层通过AWS Athena进行数据访问结论附录Snappy Lambda运行时问题Fastparquet安装问题

在此检查Lambda的输出,如果它指示仍然有其他页面可用,则再次执行Lambda函数。 可以通过Cloudwatch事件触发Step功能,方法是在给定时间指示"状态":"开始",并组织所有数据的提取,两次运行之间传递的数据/事件如下:

{

"state": "Scroll",

"scrollId": "DnF1ZXJ5VGhlbkZldGNoBQA2VjRfUUJRAAAAALckuFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JLxZGOW0tSGw4c1JOS1BZVzI2VjRfUUJRAAAAAAAAApAWbGtqLUVOQWVSZm03NmdxTllqNFhZZwAAAAAALckxFkY5bS1IbDhzUk5LUFlXMjZWNF9RQlEAAAAAAC3JMBZGOW0tSGw4c1JOS1BZVzI",

"index": 12879232

}

可能的状态为开始,滚动和完成。 状态函数定义如下:

{  "StartAt": "ScrollAgain?",  "States": {    "ScrollAgain?": {      "Type" : "Choice",      "Choices": [        {          "Variable": "$.state",          "StringEquals": "Done",          "Next": "FinalState"        }      ],      "Default": "Scrolling"    },"Scrolling": {      "Type" : "Task",      "Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxx:function:MyParquetfunction",      "Next": "ScrollAgain?"    },"FinalState": {      "Type": "Succeed"    }  }}
           

数据存储为列式文件

在许多分析用例中,提取的数据可以存储为CSV文件,以进行进一步处理。 但是,由于数据集很大,因此这种方法实际上不是可行的选择。

将数据提取存储在多个数据文件中,特别是作为列存储文件存储,具有多个优点,尤其是在将数据与Athena一起使用时。 与基于行的文件(如CSV)相比,Apache Parquet面向列,旨在提供高效的列式数据存储。 它具有字典编码,位打包和游程长度编码(RLE)等功能来压缩数据。 Twitter工程团队的博客文章"使用Parquet使Dremel变得简单"给出了更详细的概述。

可以对镶木地板文件应用其他压缩,甚至可以对每列应用不同的压缩算法。

Snappy是一种轻量级且快速压缩的编解码器,不需要太多的CPU使用率,但压缩效果不如gzip或bzip2那样好。 在这种情况下,应用快速压缩会将文件大小减少5倍,并且比相同的文件gzip压缩大2倍。

在Python中,编写镶木地板文件并将上传内容集成到S3中非常容易:

import s3fsimport fastparquet as fpimport pandas as pdimport numpy as nps3 = s3fs.S3FileSystem()myopen = s3.opens3bucket = 'mydata-aws-bucket/'# random dataframe for demodf = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))parqKey = s3bucket + "datafile"  + ".parq.snappy"fp.write(parqKey, df ,compression='SNAPPY', open_with=myopen)
           

Pandas和Numpy应该被称为标准Python数据科学库。 Fastparquet是Parquet文件格式的接口,该文件格式使用Numba Python到LLVM编译器来提高速度。 它是Dask项目的一个分叉,来自Joe Crobak的python-parquet的原始实现。 S3Fs是S3的Pythonic文件接口,它建立在boto3之上。

将所有依赖项打包为Lambda层

通过压缩包打包了我上一篇博客文章"使用AWS Lambda和Layers进行纯无服务器机器推理的纯文本",将要上传的依赖关系打包。 参见下文,必须相应地设置LD_LIBRARY_PATH,以使Lambda函数访问该层中已编译的Snappy库。 同样使用Pip和附加目标值进行构建会导致fastparquet安装出现一些问题,下面提供了更多详细信息。

rm -rf python && mkdir -p python

docker run --rm -v $(pwd):/foo -w /foo lambci/lambda:build-python3.7 /foo/build.sh

清除"外部"目标目录,然后在容器内执行与AWS运行时匹配的构建。 build.sh如下所示:

#!/usr/bin/env bash

export PYTHONPATH=$PYTHONPATH:/foo/python

yum install snappy-devel -y

pip install --upgrade pip

pip install -r requirements.txt --no-binary --no-dependencies --no-cache-dir -t python

pip install fastparquet==0.2.1 --no-binary --no-dependencies --no-cache-dir -t python

cp /usr/lib64/libsnappy* /foo/lib64/

如前所述,由于使用了-t python target参数,因此必须从requirements.txt文件中手动删除fastparquet模块并进行第二步安装。 然后可以像下面这样部署该层:

zip -r MyParquet_layer.zip python lib64

aws s3 cp MyParquet_layer.zip s3://mybucket/layers/

aws lambda publish-layer-version --layer-name MyParquet_layer --content S3Bucket=mybucket,S3Key=layers/MyParquet_layer.zip --compatible-runtimes python3.7

通过AWS Athena进行数据访问

为了进一步处理数据并提取一些子集,AWS Athena是完美的工具。 AWS Athena是一种无服务器交互式查询服务,可轻松使用标准SQL在S3中分析大量数据。 它开箱即用地处理诸如Parquet或ORC的列状和压缩数据。

可以通过以下命令轻松创建S3中数据的Athena表:

CREATE EXTERNAL TABLE IF NOT EXISTS myanalytics_parquet (

`column1` string,

`column2` int,

`column3` DOUBLE,

`column4` int,

`column5` string

)

STORED AS PARQUET

LOCATION 's3://mybucket/parquetdata/'

tblproperties ("parquet.compress"="SNAPPY")

此后,数据已经可用,并且可以以惊人的速度和响应时间进行查询。

SELECT column5, count(*) AS cnt FROM myanalytics_parquetGROUP BY column5ORDER BY cnt DESC;
           

这样的查询只需要几秒钟即可对完整的数据集执行。 结果数据在分配给Athena实例的默认S3存储桶中可用,并且可以提取为CSV文件。

结论

从Elasticsearch中批量提取数据并将其转换为压缩的Parquet文件(存储在S3中)是处理大型数据集的有效方法。 将AWS Athena与这些文件一起使用,可以轻松快速,高效地对数据集进行切片和切块,并允许取出定义的数据子集(降维)以进行进一步处理或特征提取。 与使用Elasticsearch中的原始数据集相比,这无疑是一种更快,更有效的方法。 可以通过滚动功能来完成从Elasticsearch中获取数据的操作,并使用step函数和lambda运行它是一种有效的无服务器解决方案。

附录

将snappy和fastparquet安装到lambda函数中并不是那么容易,因为它们都使用C扩展名和库。

Snappy Lambda运行时问题

关于快速安装,我遇到了以下问题:

ModuleNotFoundError: No module named 'snappy._snappy_cffi'

不幸的是,这是一个掩盖的错误,最初的异常是由于找不到libsnappy.so库而发生的,然后又发生了另一个异常,如上所示,这最初使我走错了路。 这里的问题是Lambda图层都映射在/ opt目录下。 在aws-lambda-container-image-converter存储库中未检测到/ opt下的此问题报告库#12,该线索表明必须扩展LD_LIBRARY_PATH并将其设置为Lambda函数,例如:

$LAMBDA_TASK_ROOT/lib:$LAMBDA_TASK_ROOT/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:/opt/lib:/opt/lib64:/lib64:/usr/lib64

然后将共享库放在可以找到共享库的lib64文件夹中。

Fastparquet安装问题

快速镶木地板的安装也造成了一些麻烦。 如果使用目标参数执行Pip命令,则fastparquet构建无法找到必要的numpy头文件。 解决此问题的一种方法是,从require.txt文件中删除fastparquet并将其安装为第二步。 如果正确设置了PYTHONPATH,则可以找到numpy标头。

(本文翻译自Klaus Seiler的文章《Effective data exploration via columnar data formats like Parquet》,参考:https://medium.com/merapar/effective-data-exploration-via-columnar-data-formats-like-parquet-652466676188)