天天看點

Flink Catalog解讀

文章目錄

  • ​​01 引言​​
  • ​​02 Catalog​​
  • ​​2.1 Catalog概述​​
  • ​​2.2 Catalog分類​​
  • ​​2.3 Catalog API​​
  • ​​2.3.1 資料庫操作​​
  • ​​2.3.2 表操作​​
  • ​​2.3.3 視圖操作​​
  • ​​2.3.4 分區操作​​
  • ​​2.3.5 函數操作​​
  • ​​2.4 Catalog 示例(SQL Client的方式)​​
  • ​​03 文末​​

01 引言

我們知道 Flink 有​

​Table​

​​(表)、​

​View​

​​(視圖)、​

​Function​

​​(函數/算子)、​

​Database​

​(資料庫)的概念,這都類似于我們平常使用的關系型資料庫裡面的概念。

相對于關系型資料庫的這些概念,Flink 裡還有一個 ​

​Catalog​

​(目錄) 的概念,本文來講解下。

Flink Catalog解讀

02 Catalog

2.1 Catalog概述

資料處理最關鍵的方面之一是管理中繼資料:

  • 中繼資料可以是臨時的,例如在​

    ​Flink​

    ​​中臨時表、或者通過​

    ​TableEnvironment​

    ​​ 注冊的​

    ​UDF​

    ​;
  • 中繼資料也可以是持久化的,例如​

    ​Hive Metastore​

    ​ 中的中繼資料。

Catalog在Flink中提供了一個統一的API,用于管理中繼資料,并使其可以從 Table API 和 SQL 查詢語句中來通路。​

​Catalog ​

​提供了中繼資料資訊,例如資料庫、表、分區、視圖以及資料庫或其他外部系統中存儲的函數和資訊。

2.2 Catalog分類

Catalog目前分為以下幾類:

分類 描述 缺陷
GenericInMemoryCatalog 基于記憶體實作的 Catalog 所有中繼資料隻在 session 的生命周期内可用
​​JdbcCatalog​​ 可以将 Flink 通過 JDBC 協定連接配接到關系資料庫 JDBC Catalog隻實作了PostgresCatalog
​​HiveCatalog​​ 作為原生 Flink 中繼資料的持久化存儲,以及作為讀寫現有 Hive 中繼資料的接口 Hive Metastore 以小寫形式存儲所有中繼資料對象名稱。而 GenericInMemoryCatalog 區分大小寫。
自定義 Catalog 通過實作 Catalog 接口來開發自定義 Catalog -

2.3 Catalog API

2.3.1 資料庫操作

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get database
catalog.getDatabase("mydb");

// check if a database exist
catalog.databaseExists("mydb");

// list databases in a catalog
catalog.listDatabases("mycatalog");      

2.3.2 表操作

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table
catalog.getTable("mytable");

// check if a table exist or not
catalog.tableExists("mytable");

// list tables in a database
catalog.listTables("mydb");      

2.3.3 視圖操作

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");      

2.3.4 分區操作

// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));      

2.3.5 函數操作

catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function
catalog.getFunction("myfunc");

// check if a function exist or not
catalog.functionExists("myfunc");

// list functions in a database
catalog.listFunctions("mydb");      

2.4 Catalog 示例(SQL Client的方式)

① 首先需要注冊Catalog:使用者可以通路預設建立的記憶體 Catalog default_catalog,這個 Catalog 預設擁有一個預設資料庫 default_database。 使用者也可以注冊其他的 Catalog 到現有的 Flink 會話中,建立方式如下(可以使用Flink裡面的Factory工廠模式動态加載):

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));      

② 指定使用的内容:Flink 始終在目前的 Catalog 和資料庫中尋找表、視圖和 UDF,代碼如下:

Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;      
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;      
-- 列出可用的 Catalog
Flink SQL> show catalogs;

-- 列出可用的資料庫 
Flink SQL> show databases;

-- 列出可用的表
Flink SQL> show tables;