全網最全大資料面試提升手冊!
第一部分:Iceberg 核心功能原理剖析 :
Apache Iceberg
摘自官網:
Apache Iceberg is an open table format for huge analytic datasets.
可以看到 Founders 對 Iceberg 的定位是面向海量資料分析場景的高效存儲格式。海量資料分析的場景,類比于 Hive 是 Hdfs 的封裝一樣,本質上解決的還是數倉場景的痛點問題。
Iceberg 在最開始,也确實是在數倉場景朝着更快更好用的 format 目标不斷演進,比如支援 schema 變更,檔案粒度的 Filter 優化等,但随着和流式計算 Flink 引擎的生态打通,Delete/Update/Merge 語義的出現,場景就會變得多樣化起來。
背景
過去業界更多是使用 Hive/Spark on HDFS 作為離線資料倉庫的載體,在越來越趨于實時化和快速疊代的場景中,逐漸暴露出以下缺點:
- 不支援 Row-Level-Update,對于更新的操作需要 overwrite 整張 Hive 表,成本極高
- 不支援讀寫分離,使用者的讀取操作會被另一個使用者的寫入操作所影響(尤其是流式讀取的場景)
- 不支援版本復原和快照,需要儲存大量曆史資料
- 不支援增量讀取,每次掃描全表或分區所有資料
- 性能低,隻能裁剪到 Hive Partition 粒度
- 不支援 Schema 變更
- .....
基本概念
如上圖所示,iceberg 将 hdfs 上的檔案進行了 snapshot、manifest list、manifest、data files 的分層。
- Snapshot:使用者的每次 commit(每次寫入的 spark job) 會産生一個新的 snapshot
- Manifest List:維護目前 snapshot 中所有的 manifest
- Manifest:維護目前 Manifest 下所有的 data files
- Data File:存儲資料的檔案,後續 Iceberg 引入了 Delete File,用于存儲要删除的資料,檔案結構上也是與 Data File 處在同一層
核心功能剖析
Time Travel 和增量讀取
Time Travel 指的是使用者可以任意讀取曆史時刻的相關資料,以 Spark 的代碼為例:
// time travel to October 26, 1986 at 01:21:00
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("path/to/table")
上述代碼即是在讀取 timestamp=499162860000 時,該 Iceberg 表的資料,那麼底層原理是什麼樣子的呢?
從「基本概念」中的檔案結構可以看到,使用者每次新的寫入都會産生一個 snapshot,那麼 Iceberg 隻需要存儲使用者每次 commit 産生的 metadata,比如時間戳等資訊,就能找到對應時刻的 snapshot,并且解析出 Data Files。
增量讀取也同理,通過 start 和 end 的時間戳取到時間範圍内的 snapshot,并讀取所有的 Data Files 作為原始資料。
Fast Scan & Data Filtering
上面提到 Hive 的查詢性能低下,其中一個原因是資料計算時,隻能下推到 Partition 層面,粒度太粗。而 Iceberg 在細粒度的 Plan 上做了一系列的優化,當一個 Query 進入 Iceberg 後:
- 根據 timestamp 找到對應的 snapshot(預設最新)
- 根據 Query 的 Partition 資訊從指定 snapshot 中過濾出符合條件的 manifest 檔案集合
- 從 manifest 檔案集合中取出所有的 Data Files 對象(隻包含元資訊)
- 根據 Data File 的若幹個屬性,進行更細粒度的資料過濾,包括 column-level value counts, null counts, lower bounds, and upper bounds 等
Delete 實作
為了上線 Row-Level Update 的功能,Iceberg 提供了 Delete 的實作,通過 Delete + Insert 我們可以達到 Update 的目的。在引入 Delete 實作時,引入了兩個概念:
- Delete File:用于存儲删除的資料(分為 position delete 和 equality delete)
- Sequence Number:是 Data File 和 Delete File 的共有屬性之一,主要用于區分 Insert 和 Delete 的先後順序,否則會出現資料一緻性的問題
position & equality delete
Iceberg 引入了 equality_ids 概念,使用者建表時可以指定 Table 的 equality_ids 來辨別未來 Delete 操作對應的 Key,比如 GDPR 場景,我們需要根據 user_id 來随機删除使用者的相關資料,就可以把 equality_ids 設定為 user_id。
兩種 Delete 操作對應不同的 Delete File,其存儲字段也不同:
- position delete:包括三列,file_path(要删除的資料所在的 Data File)、pos(行數)、row(資料)
- equality delete:包括 equality_ids 中的字段
顯而易見,存儲 Delete File 的目的是将來讀取資料時,進行實時的 Join,而 position delete 在 Join 時能精準定位到檔案,并且隻需要行号的比較,肯定是更加高效的。是以在 Delete 操作寫入時,Iceberg 會将正在寫入的資料檔案資訊存儲到記憶體中,來保證将 DELETE 操作盡量走 position delete 的鍊路。示意圖如下所示:
按照時間順序,依次寫入三條 INSERT 和 DELETE 資料,假設 Iceberg Writer 在寫入 a1 和 b1 的 INSERT 資料後,就關閉并新開啟了一個檔案,那麼此時寫入的記錄 c1 和對應的行号會被記錄在記憶體中。此時 Writer 接收到 user_id=c1 的資料後,便能直接從記憶體中找到 user_id=c1 的資料是在 fileA 中的第一行,此時寫下一個 Position Delete File;而 user_id=a1 的 DELETE 資料,由于檔案已經關閉,記憶體中沒有記錄其資訊,是以寫下一個 Equality Delete File。
Sequence Number
引入 DELETE 操作後,如果在讀取時進行合并,則涉及到一個問題,如果使用者對同一個 equality_id 的資料進行插入、删除、再插入,那麼讀取時該如何保證把第一次插入的資料給删掉,讀取第二次插入的資料?
這裡的處理方式是将 Data File 和 Delete File 放在一起按寫入順序編号,在讀取時,DELETE 隻對小于目前 Sequence Number 的 Data File 生效。如果遇到相同記錄的并發寫入的時候怎麼辦?這裡就要利用 Iceberg 自身的事務機制了,Iceberg Writer 在寫入前會檢查相關 meta 以及 Sequence Number,如果寫入後不符合預期則會采取樂觀鎖的形式進行重試。
Schema Evolution
Iceberg 的 schema evolution 是其特色之一,支援以下操作:
- 增加字段
- 删除字段
- 重命名字段
- 修改字段
- 改變字段順序
關于 schema 的變更也依賴上面檔案結構,由于每次寫入時,都會産生 snapshot -> manifest -> data file 的層級,同樣,讀取時也會從 snapshot 開始讀取并路由到對應的底層 data file。是以 Iceberg 隻需要每次寫入時在 manifest 中記錄下 schema 的情況,并在讀取時進行對應的轉換即可。
第二部分:Flink+Iceberg環境搭建:
1. Flink SQL Client配置Iceberg
Flink叢集需要使用Scala 2.12版本的
- 将Iceberg的依賴包下載下傳放到Flink叢集所有伺服器的lib目錄下,然後重新開機Flink
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
[root@flink1 ~]#
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 42.9MB/s 00:00
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 35.4MB/s 00:00
[root@flink1 ~]#
Iceberg預設支援Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink叢集所有伺服器的lib目錄下,然後重新開機Flink
- 然後啟動SQL Client就可以了
2.Java/Scala pom.xml配置
添加如下依賴
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>
3.Catalog
3.1 Hive Catalog
注意:測試的時候,從Hive中查詢表資料,查詢不到。但是從Trino查詢可以查詢到資料
使用Hive的metastore儲存中繼資料,HDFS儲存資料庫表的資料
Flink SQL> create catalog hive_catalog with(
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'uri'='thrift://hive1:9083',
> 'client'='5',
> 'warehouse'='hdfs://nnha/user/hive/warehouse',
> 'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.
Flink SQL>
- property-version: 為了向後相容,以防property格式改變。目前設定為1即可
- cache-enabled: 是否開啟catalog緩存,預設開啟
- clients: 在hive metastore中,hive_catalog供用戶端通路的連接配接池大小,預設是2
- warehouse: 是Flink叢集所在的HDFS路徑, hive_catalog下的資料庫表存放資料的位置
- hive-conf-dir: hive叢集的配置目錄。隻能是Flink叢集的本地路徑,從hive-site.xml解析出來的HDFS路徑,是Flink叢集所在HDFS路徑
- warehouse的優先級比hive-conf-dir的優先級高
- 如果Hive中已經存在要建立的資料庫,則建立的表path會位于Hive的warehouse下
3.2 HDFS Catalog
用HDFS儲存中繼資料和資料庫表的資料。warehouse是Flink叢集所在的HDFS路徑
Flink SQL> create catalog hadoop_catalog with (
> 'type'='iceberg',
> 'catalog-type'='hadoop',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'warehouse'='hdfs://nnha/user/iceberg/warehouse'
> );
[INFO] Execute statement succeed.
Flink SQL>
通過配置conf/sql-cli-defaults.yaml實作永久catalog。但測試的時候并未生效
[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml
catalogs:
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
property-version: 1
cache-enabled: true
warehouse: hdfs://nnha/user/iceberg/warehouse
[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml
下面我們重點以Hadoop Catalog為例,進行測試講解
4.資料庫和表相關DDL指令
4.1 建立資料庫
Catalog下面預設都有一個default資料庫
Flink SQL> create database hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL>
- 會在HDFS目錄上建立iceberg_db子目錄
- 如果删除資料庫,會删除HDFS上的iceberg_db子目錄
4.2 建立表(不支援primary key等)
Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
> user_id bigint comment '使用者ID',
> user_name string,
> birthday date,
> country string
> ) comment '使用者表'
> partitioned by (birthday, country) with (
> 'write.format.default'='parquet',
> 'write.parquet.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
- 目前表不支援計算列、primay key, Watermark
- 不支援計算分區。但是iceberg支援計算分區
- 因為Iceberg支援primary key。設定屬性
,同時表添加primary key,也是可以支援upsert的。可以實作insert、update、delete的功能'format-version' = '2'和'write.upsert.enabled' = 'true'
- 建立表生成的檔案資訊如下:
[root@flink1 ~]#
[root@flink1 ~]# hadoop fs -ls hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
Found 2 items
-rw-r--r-- 1 root supergroup 2115 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
-rw-r--r-- 1 root supergroup 1 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
[root@flink1 ~]#
檢視v1.metadata.json,可以看到
"current-snapshot-id" : -1
Flink SQL> create table hadoop_catalog.iceberg_db.my_user_copy
> like hadoop_catalog.iceberg_db.my_user;
[INFO] Execute statement succeed.
Flink SQL>
- 複制的表擁有相同的表結構、分區、表屬性
4.3 修改表
修改表屬性
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> set(
> 'write.format.default'='avro',
> 'write.avro.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
- 目前Flink隻支援修改iceberg的表屬性
重命名表
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> rename to hadoop_catalog.iceberg_db.my_user_copy_new;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables
Flink SQL>
- Hadoop Catalog中的表不支援重命名表
4.4 删除表
Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.
Flink SQL>
會删除HDFS上的my_user_copy子目錄
5.插入資料到表
5.1 insert into
- insert into … values …
- insert into … select …
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(1, 'zhang_san', date '2022-02-01', 'china'),
> (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268
Flink SQL>
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user select (user_id + 1), user_name, birthday, country from hadoop_catalog.iceberg_db.my_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c408e324ca3861b39176c6bd15770aca
Flink SQL>
HDFS目錄結果如下
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
5.2 insert overwrite(隻有Batch模式支援,且overwrite粒度為partition)
隻支援Flink Batch模式,不支援Streaming模式
insert overwrite替換多個整個分區,而不是一行資料。如果不是分區表,則替換的是整個表,如下所示:
Flink SQL> set 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 4 | wang_wu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
birthday=2022-02-02/country=japan分區下的資料如下,insert overwrite也是新增一個檔案
birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e
insert ovewrite … partition替換指定分區
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user partition (birthday = '2022-02-02', country = 'japan') select 5, 'zhao_liu';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 97e9ba4131028c53461e739b34108ae0
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
6.查詢資料
Batch模式
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
streaming模式
檢視最新的snapshot-id
[root@flink1 conf]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
5
我們前面建立表 + 兩次insert + 兩次insert overwrite,是以最新的版本号為5。然後我們檢視該版本号對于的metadata json檔案
[root@flink1 ~]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
{
"format-version" : 1,
"table-uuid" : "84a5e90d-7ae9-4dfd-aeab-c74f07447513",
"location" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
"last-updated-ms" : 1644761481488,
"last-column-id" : 4,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
} ],
"partition-spec" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ]
} ],
"last-partition-id" : 1001,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"write.format.default" : "parquet",
"write.parquet.compression-codec" : "gzip"
},
"current-snapshot-id" : 138573494821828246,
"snapshots" : [ {
"snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761130111,
"summary" : {
"operation" : "append",
"flink.job-id" : "8f228ae49d34aafb4b2887db3149e3f6",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "2",
"total-files-size" : "2487",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 453371561664052237,
"parent-snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761150082,
"summary" : {
"operation" : "append",
"flink.job-id" : "813b7a17c21ddd003e1a210b1366e0c5",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "4",
"total-files-size" : "4974",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
"schema-id" : 0
}, {
"snapshot-id" : 6410282459040239217,
"parent-snapshot-id" : 453371561664052237,
"timestamp-ms" : 1644761403566,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "f7085f68e5ff73c1c8aa1f4f59996068",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "2",
"added-records" : "1",
"deleted-records" : "2",
"added-files-size" : "1244",
"removed-files-size" : "2459",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3759",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 138573494821828246,
"parent-snapshot-id" : 6410282459040239217,
"timestamp-ms" : 1644761481488,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "d434d6d4f658d61732d7e9a0a85279fc",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "1",
"added-records" : "1",
"deleted-records" : "1",
"added-files-size" : "1251",
"removed-files-size" : "1244",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3766",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
"schema-id" : 0
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1644761130111,
"snapshot-id" : 8012517928892530314
}, {
"timestamp-ms" : 1644761150082,
"snapshot-id" : 453371561664052237
}, {
"timestamp-ms" : 1644761403566,
"snapshot-id" : 6410282459040239217
}, {
"timestamp-ms" : 1644761481488,
"snapshot-id" : 138573494821828246
} ],
"metadata-log" : [ {
"timestamp-ms" : 1644760911017,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1644761130111,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
}, {
"timestamp-ms" : 1644761150082,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
}, {
"timestamp-ms" : 1644761403566,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
} ]
}[root@flink1 ~]#
可以看到
"current-snapshot-id" : 138573494821828246,
,表示目前的snapshot-id
Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 5 | zhao_liu | 2022-02-02 | japan |
| +I | 2 | zhang_san | 2022-02-01 | china |
| +I | 1 | zhang_san | 2022-02-01 | china |
可以看到最新snapshot對應的資料
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
這裡隻能指定最後一個insert overwrite操作的snapshot id,及其後面的snapshot id,否則背景會報異常,且程式一直處于restarting的狀态:
java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (8012517928892530314, 138573494821828246]
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(6, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8eb279e61aed66304d78ad027eaf8d30
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(7, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 70a050e455d188d0d3f3adc2ba367fb6
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='30s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 6 | zhang_san | 2022-02-01 | china |
| +I | 7 | zhang_san | 2022-02-01 | china |
- streaming模式支援讀取增量snapshot資料
- 如果不指定start-snapshot-id,則先讀取目前snapshot全量資料,再讀取增量資料。如果指定start-snapshot-id,讀取該snapshot-id之後的增量資料,即不讀取該snapshot-id的資料
- monitor-interval:表示監控新送出的資料檔案的時間間隔,預設1s