天天看点

Hudi on Flink 快速上手指南

一、背景

Apache Hudi 是目前最流行的数据湖解决方案之一,Data Lake Analytics[1] 集成了 Hudi 服务高效的数据 MERGE(UPDATE/DELETE)场景;AWS 在 EMR 服务中 预安装[2] 了 Apache Hudi,为用户提供高效的 record-level updates/deletes 和高效的数据查询管理;Uber [3]已经稳定运行 Apache Hudi 服务 4 年多,提供了低延迟的数据库同步和高效率的查询[4]。自 2016 年 8 月上线以来,数据湖存储规模已经超过 100PB[5]。

Apache Flink 作为目前最流行的流计算框架,在流式计算场景有天然的优势,当前,Flink 社区也在积极拥抱 Hudi 社区,发挥自身 streaming 写/读的优势,同时也对 batch 的读写做了支持。

Hudi 和 Fink 在 0.8.0 版本做了大量的集成工作[6]。核心的功能包括:

实现了新的 Flink streaming writer

支持 batch 和 streaming 模式 reader

支持 Flink SQL API

Flink streaming writer 通过 state 实现了高效的 index 方案,同时 Hudi 在 UPDATE/DELETE 上的优秀设计使得 Flink Hudi 成为当前最有潜力的 CDC 数据入湖方案,因为篇幅关系,将在后续的文章中介绍。

本文用 Flink SQL Client 来简单的演示通过 Flink SQL API 的方式实现 Hudi 表的操作,包括 batch 模式的读写和 streaming 模式的读。

二、环境准备

本文使用 Flink Sql Client[7] 作为演示工具,SQL CLI 可以比较方便地执行 SQL 的交互操作。

第一步:下载 Flink jar

Hudi 集成了 Flink 的 1.11 版本。您可以参考这里[8]来设置 Flink 环境。hudi-flink-bundle jar 是一个集成了 Flink 相关的 jar 的 uber jar, 目前推荐使用 scala 2.11 来编译。

第二步:设置 Flink 集群

启动一个 standalone 的 Flink 集群。启动之前,建议将 Flink 的集群配置设置如下:

在 $FLINK_HOME/conf/flink-conf.yaml 中添加配置项 taskmanager.numberOfTaskSlots: 4

在 $FLINK_HOME/conf/workers 中将条目 localhost 设置成 4 行,这里的行数代表了本地启动的 worker 数

启动集群:

第三步:启动 Flink SQL Client

Hudi 的 bundle jar 应该在 Sql Client 启动的时候加载到 CLASSPATH 中。您可以在路径 hudi-source-dir/packaging/hudi-flink-bundle 下手动编译 jar 包或者从 Apache Official Repository [9]下载。

启动 SQL CLI:

备注:

推荐使用 hadoop 2.9.x+ 版本,因为一些对象存储(aliyun-oss)从这个版本开始支持

flink-parquet 和 flink-avro 已经被打进 hudi-flink-bundle jar

您也可以直接将 hudi-flink-bundle jar 拷贝到 $FLINK_HOME/lib 目录下

本文的存储选取了对象存储 aliyun-oss,为了方便,您也可以使用本地路径

演示的工作目录结构如下:

三、Batch 模式的读写

插入数据

使用如下 DDL 语句创建 Hudi 表:

DDL 里申明了表的 path,record key 为默认值 uuid,pre-combine key 为默认值 ts 。

然后通过 VALUES 语句往表中插入数据:

这里看到 Flink 的作业已经成功提交到集群,可以本地打开 web UI 观察作业的执行情况:

Hudi on Flink 快速上手指南

查询数据

作业执行完成后,通过 SELECT 语句查询表结果:

这里执行语句 set execution.result-mode=tableau; 可以让查询结果直接输出到终端。

通过在 WHERE 子句中添加 partition 路径来裁剪 partition:

更新数据

相同的 record key 的数据会自动覆盖,通过 INSERT 相同 key 的数据可以实现数据更新:

可以看到 uuid 为 id1 和 id2 的数据 age 字段值发生了更新。

再次 insert 新数据观察结果:

四、Streaming 读

通过如下语句创建一张新的表并注入数据:

这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读。

以上操作发生在一个 terminal 中,我们称之为 terminal_1。

从新的 terminal(我们称之为 terminal_2)再次启动 Sql Client,重新创建 t1 表并查询:

回到 terminal_1,继续执行 batch mode 的 INSERT 操作:

几秒之后,观察 terminal_2 的输出多了一行:

再次在 terminal_1 中执行 INSERT 操作:

观察 terminal_2 的输出变化:

五、总结

通过一些简单的演示,我们发现 HUDI Flink 的集成已经相对完善,读写路径均已覆盖,关于详细的配置,可以参考 Flink SQL Config Options[10]。

Hudi 社区正在积极的推动和 Flink 的深度集成,包括但不限于:

Flink streaming reader 支持 watermark,实现数据湖/仓的中间计算层 pipeline

Flink 基于 Hudi 的物化视图,实现分钟级的增量视图,服务于线上的近实时查询