文章目錄
- 01 引言
- 02 Catalog
- 2.1 Catalog概述
- 2.2 Catalog分類
- 2.3 Catalog API
- 2.3.1 資料庫操作
- 2.3.2 表操作
- 2.3.3 視圖操作
- 2.3.4 分區操作
- 2.3.5 函數操作
- 2.4 Catalog 示例(SQL Client的方式)
- 03 文末
01 引言
我們知道 Flink 有
Table
(表)、
View
(視圖)、
Function
(函數/算子)、
Database
(資料庫)的概念,這都類似于我們平常使用的關系型資料庫裡面的概念。
相對于關系型資料庫的這些概念,Flink 裡還有一個
Catalog
(目錄) 的概念,本文來講解下。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CNwkTO3U2NjN2NkRzMkFGZyYzX5QTM0ADMzIzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
02 Catalog
2.1 Catalog概述
資料處理最關鍵的方面之一是管理中繼資料:
- 中繼資料可以是臨時的,例如在
中臨時表、或者通過Flink
注冊的TableEnvironment
;UDF
- 中繼資料也可以是持久化的,例如
中的中繼資料。Hive Metastore
Catalog在Flink中提供了一個統一的API,用于管理中繼資料,并使其可以從 Table API 和 SQL 查詢語句中來通路。
Catalog
提供了中繼資料資訊,例如資料庫、表、分區、視圖以及資料庫或其他外部系統中存儲的函數和資訊。
2.2 Catalog分類
Catalog目前分為以下幾類:
分類 | 描述 | 缺陷 |
GenericInMemoryCatalog | 基于記憶體實作的 Catalog | 所有中繼資料隻在 session 的生命周期内可用 |
JdbcCatalog | 可以将 Flink 通過 JDBC 協定連接配接到關系資料庫 | JDBC Catalog隻實作了PostgresCatalog |
HiveCatalog | 作為原生 Flink 中繼資料的持久化存儲,以及作為讀寫現有 Hive 中繼資料的接口 | Hive Metastore 以小寫形式存儲所有中繼資料對象名稱。而 GenericInMemoryCatalog 區分大小寫。 |
自定義 Catalog | 通過實作 Catalog 接口來開發自定義 Catalog | - |
2.3 Catalog API
2.3.1 資料庫操作
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get database
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");
2.3.2 表操作
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
2.3.3 視圖操作
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
2.3.4 分區操作
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
2.3.5 函數操作
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
2.4 Catalog 示例(SQL Client的方式)
① 首先需要注冊Catalog:使用者可以通路預設建立的記憶體 Catalog default_catalog,這個 Catalog 預設擁有一個預設資料庫 default_database。 使用者也可以注冊其他的 Catalog 到現有的 Flink 會話中,建立方式如下(可以使用Flink裡面的Factory工廠模式動态加載):
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
② 指定使用的内容:Flink 始終在目前的 Catalog 和資料庫中尋找表、視圖和 UDF,代碼如下:
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
-- 列出可用的 Catalog
Flink SQL> show catalogs;
-- 列出可用的資料庫
Flink SQL> show databases;
-- 列出可用的表
Flink SQL> show tables;