天天看点

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。

架构介绍

表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。

表格存储支持多种数据库模型,可以广泛应用于时序数据、时空数据、消息数据、元数据以及大数据等核心数据场景。当海量的数据存储在表格存储中您希望把数据实时汇聚在 OSS 构建内部的大数据数据湖时,可以使用 Data Lake Formation 提供的托管的数据投递功能,把写入表格存储的数据实时 ETL 到 OSS 中,比以 Delta Lake 的格式存储在 OSS 之上。进入 Delta Lake 后的数据可以再做进一步的流计算或者批计算,大体架构如下图所示

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

本文会重点介绍如何操作快速构建上述的架构图。这里我们假设你已经购买并使用 Tablestore 做为你的存储选型,如果还没有使用,可以参考

这里

。购买并开通表格存储服务,使用我们的 SDK,或者各类数据管道,导入工具例如 Datax,数据集成服务,Datahub,DTS,又或者计算引擎 Spark,Flink,把数据写入表格存储。下面我们重点介绍如何把这些数据进行 Delta Lake 的数据构建。

数据湖构建环境准备

1.登录数据湖构建

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

2.创建新的入湖模板

选择实时 OTS,这里 DLF 会使用 Spark Streaming 的方式去订阅 Tablestore 中的数据。Tablestore 的 CDC 因为支持灵活的数据订阅能力,包括全量,全加增,以及增量三种模式,所以可以使用同一的流式入口。

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖
使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

3.注意如果是第一次使用,需要创建配置一下数据湖位置,即上图中的目标数据库,选择一个你的 OSS 路径。配置后点击下一步,如图中所示我们创建了一个 BaseAndStream 类型的通道,使得数据湖中可以包含表格存储的全量数据以及未来的新增实时数据。CU 可以根据你的数据写入量和大小进行动态配置。这里我们设置了10cu。

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

4.创建好入湖模板后,点击运行即可开始数据的实时入湖工作流。

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

5.点击运行,投递任务开始进行,这时候等一小段时间就可以在 OSS 中看到你的 delta 数据,分为 log 路径和data 路径,如下图所示:

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

数据文件使用的也是 parquet。这时候当数据实时汇聚在 delta 后,就可以开始我们的基于 deltalake 的数据分析处理了。

6.除了通过 dlf 的监控查看消费情况,还可以在 Tablestore 控制台查看数据投递进度:

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

数据湖数据分析

这一节来简单介绍下,数据通过 DLF 投递入湖后,如何进行数据的分析。这里我们使用 EMR Spark 来进行数据分析。使用了 DatalakeFormation 进行数据投递后,创建 emr 集群选择数据湖元数据,做为 emr 元数据。此时我们之前投递的 OSS 数据湖可以自动关联外表。

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖

集群创建好后,我们登陆集群启动 Spark SQL,

spark-sql --master yarn --num-executors 4 --executor-memory 8g --executor-cores 4

执行showdatabse,dlf create oss 数据湖会被list出来:
20/12/29 11:26:29 INFO [main] SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1609144808351_0011
spark-sql> show databases;
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411579 with spark sql successfully.
20/12/29 11:26:51 INFO [main] CodeGenerator: Code generated in 168.938794 ms
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411922 with spark sql successfully.
default
dlftpch1
testlakeformation1
Time taken: 1.344 seconds, Fetched 3 row(s)
20/12/29 11:26:51 INFO [main] SparkSQLCLIDriver: Time taken: 1.344 seconds, Fetched 3 row(s)

进入dlf的数据库dlftpch1,执行show tables; deltalake的数据湖也会被自动关联外表,schema和我们的DLF schema一致。
spark-sql> show tables;
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498535 with spark sql successfully.
20/12/29 11:28:18 INFO [main] CodeGenerator: Code generated in 9.697932 ms
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498560 with spark sql successfully.
dlftpch1        tpchdata1       false
Time taken: 0.121 seconds, Fetched 1 row(s)
20/12/29 11:28:18 INFO [main] SparkSQLCLIDriver: Time taken: 0.121 seconds, Fetched 1 row(s)

然后我们就可以在这张deltalake表上进行adhoc的sql计算
spark-sql> select count(*) from tpchdata1;
20/12/29 11:29:11 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212551026 with spark sql successfully.
528026067
Time taken: 21.712 seconds, Fetched 1 row(s)
20/12/29 11:29:11 INFO [main] SparkSQLCLIDriver: Time taken: 21.712 seconds, Fetched 1 row(s)

我们可以查看下这张外表的create语句:
spark-sql> show create table tpchdata1;
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: command is called
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681298 with spark sql successfully.
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: execution is called
20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681306 with spark sql successfully.
CREATE TABLE `tpchdata1` (`l_orderkey` INT, `l_linenumber` INT, `l_comment` STRING, `l_commitdate` STRING, `l_discount` DOUBLE, `l_extendedprice` DOUBLE, `l_linestatus` STRING, `l_partkey` INT, `l_quantity` DOUBLE)
USING delta
OPTIONS (
  `serialization.format` '1',
  path 'oss://lakeformation1/DLF-tpchdata1'
)
           

总结

本文介绍了使用 DLF (Data Lake Formation)的实时数据湖构建能力,订阅 Tablestore 的全增数据,构建流批一体的数据湖存储格式 Delta Lake。并在实例中使用 EMR Spark 进行构建数据的交互分析。整套架构可以帮助你基于 Tablestore + OSS 两套 Serverless 存储,低成本的构建实时的数据读写和分析。对细节架构感兴趣的同学欢迎加群交流(群号:23307953)。

使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖