天天看點

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

作者:程鶴群(軍長)

文章概述:本文主要包含三部分:第一部分,主要介紹什麼是 Table API,從概念角度進行分析,讓大家有一個感性的認識;第二部分,從代碼的層面介紹怎麼使用 Table API;第三部分,介紹 Table API 近期的動态。文章結構如下:

  • 什麼是 Table API
    • Flink API 總覽
    • Table API 的特性
  • Table API 程式設計
    • WordCount 示例
    • Table API 操作
      • 如何擷取一個 Table
      • 如果輸出一個 Table
      • 如果查詢一個 Table
  • Table API 動态

一、什麼是 Table API

為了更好地了解 Table API,我們先看下 Flink 都提供了哪些 API 供使用者使用。

1.Flink API 總覽

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

如圖,Flink 根據使用的便捷性和表達能力的強弱提供了 3 層 API,由上到下,表達能力逐漸增強,比如 processFunction,是最底層的 API,表達能力最強,我們可以用他來操作 state 和 timer 等複雜功能。Datastream API 相對于 processFunction 來說,又進行了進一步封裝,提供了很多标準的語義算子給大家使用,比如我們常用的 window 算子(包括 Tumble, slide,session 等)。那麼最上面的 SQL 和 Table API 使用最為便捷,具有自身的很多特點,重點歸納如下:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

第一,Table API & SQL 是一種聲明式的 API。使用者隻需關心做什麼,不用關心怎麼做,比如圖中的 WordCount 例子,隻需要關心按什麼次元聚合,做哪種類型的聚合,不需要關心底層的實作。

第二,高性能。Table API & SQL 底層會有優化器對 query 進行優化。舉個例子,假如 WordCount 的例子裡寫了兩個 count 操作,優化器會識别并避免重複的計算,計算的時候隻保留一個 count 操作,輸出的時候再把相同的值輸出兩遍即可,以達到更好的性能。

第三,流批統一。上圖例子可以發現,API 并沒有區分流和批,同一套 query 可以流批複用,對業務開發來說,避免開發兩套代碼。

第四,标準穩定。Table API & SQL 遵循 SQL 标準,不易變動。API 比較穩定的好處是不用考慮 API 相容性問題。

第五,易了解。語義明确,所見即所得。

2.Table API 特性

上一小節介紹了 Table API 和 SQL 一些共有的特性,這個小節重點介紹下 Table API 自身的特性。主要可以歸納為以下兩點:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

第一,Table API 使得多聲明的資料處理寫起來比較容易。

怎麼了解?比如我們有一個 Table(tab),并且需要執行一些過濾操作然後輸出到結果表,對應的實作是:tab.where(“a < 10”).inertInto(“resultTable1”);此外,我們還需要做另外一些篩選,然後也對結果輸出,即 tab.where(“a > 100”).insertInto(“resultTable2”)。你會發現,用 Table API 寫起來會非常簡潔友善,兩行代碼就把功能實作了。

第二,Table API 是 Flink 自身的一套 API,這使得我們更容易地去擴充标準的 SQL。當然,在擴充 SQL 的時候并不是随意的去擴充,需要考慮 API 的語義、原子性和正交性,并且當且僅當需要的時候才去添加。

對比 SQL,我們可以認為 Table API 是 SQL 的超集。SQL 有的操作,Table API 可以有,然而我們又可以從易用性和功能性地角度對 SQL 進行擴充和提升。

二、Table API程式設計

第一章介紹了 Table API 相關的概念。這一章我們來看下如何用 Table API 來程式設計。本章會先從一個 WordCount 的例子出發,讓大家對 Table API 程式設計先有一個大概的認識,然後再具體介紹一下 Table API 的操作,比如,如何擷取一個 Table,如何輸出一個 Table,以及如何對 Table 執行查詢操作。

1.WordCount舉例

這是一個完整的,用 java 編寫的 batch 版本的 WordCount 例子,此外,還有 scala 和 streaming 版本的 WordCount,都統一上傳到了 github 上(

https://github.com/hequn8128/TableApiDemo

),大家可以下載下傳下來嘗試運作或者修改。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {   // line:10

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .registerTableSource("fileSource");  // line:20

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toDataSet(result, Row.class).print();
    }
}           

我們具體看下這個 WordCount 的例子。首先,第13、14行,是對 environment 的一些初始化,先通過 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到執行環境,然後再通過 BatchTableEnvironment 的 create 拿到對應的 Table 環境,拿到環境後,我們可以注冊 TableSource、TableSink 或執行一些其他操作。

這裡需要注意的是,ExecutionEnvironment 跟 BatchTableEnvironment 都是對應 Java 的版本,對于 scala 程式,這裡需要是一個對應 scala 版本的 environment。這也是初學者一開始可能會遇到的問題,因為 environent 有很多且容易混淆。為了讓大家更好區分這些 environment,下面對 environment 進行了一些歸納。

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

這裡從 batch/stream,還有 Java/scala,對 environment 進行了分類,對于這些 environment 使用時需要特别注意,不要 import 錯了。environment 的問題,社群已經進行了一些讨論,如上圖下方的

連結

,這裡不再具體展開。

我們再回到剛剛的 WordCount 的例子,拿到 environment 後,需要做的第二件事情是注冊對應的TableSource。

tEnv.connect(new FileSystem().path(path))
    .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
    .withSchema(new Schema().field("word", Types.STRING))
    .registerTableSource("fileSource");           

使用起來也非常友善,首先,因為我們要讀一個檔案,需要指定讀取檔案的路徑,指定了之後,我們需要再描述檔案内容的格式,比如他是 csv 的檔案并且行分割符是什麼。還有就是指定這個檔案對應的 Schema 是什麼,比如隻有一列單詞,并且類型是 String。最後,我們需要把 TableSource 注冊到 environment 裡去。

Table result = tEnv.scan("fileSource")
    .groupBy("word")
    .select("word, count(1) as count");

tEnv.toDataSet(result, Row.class).print();           

通過 scan 剛才注冊好的 TableSource,我們可以拿到一個 Table 對象,并執行相應的一些操作,比如 GroupBy,count。最後,可以把 Table 按 DataSet 的方式進行輸出。

以上便是一個 Table API 的 WordCount 完整例子。涉及 Table 的擷取,Table 的操作,以及 Table 的輸出。接下來會具體介紹如何擷取 Table、輸出 Table 和執行 Table 操作。

2.如何擷取一個Table

擷取 Table 大體可以分為兩步,第一步,注冊對應的 TableSource;第二步,調用 Table environement 的 scan 方法擷取 Table 對象。注冊 Table Source 又有3種方法:通過 Table descriptor 來注冊,通過自定義 source 來注冊,或者通過 DataStream 來注冊。具體的注冊方式如下圖所示:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

3.如何輸出一個Table

對應輸出 Table,我們也有類似的3種方法:Table descriptor, 自定義 Table sink 以及輸出成一個 DataStream。如下圖所示:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

4.如何操作一個Table

4.1 Table 操作總覽

第2、3節介紹了如何擷取和輸出一個 Table,本節主要介紹如何對 Table 進行操作。Table 上有很多操作,比如一些 projection 操作 select、filter、where;聚合操作,如 groupBy、flatAggrgate;還有join操作,等等。我們以一個具體的例子來介紹下 Table 上各操作的轉換流程。

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

如上圖,當我們拿到一個 Table 後,調用 groupBy 會傳回一個 GroupedTable。GroupedTable 裡隻有 select 方法,對 GroupedTable 調用 select 方法會傳回一個 Table。拿到這個 Table 後,我們可以再調用 Table 上的方法。圖中其他 Table,如 OverWindowedTable 也是類似的流程。值得注意的是,引入各個類型的 Table 是為了保證 API 的合法性和便利性,比如 groupBy 之後隻有 select 操作是有意義的,在編輯器上可以直接點出來。

前面我們提到,可以将 Table API 看成是 SQL 的超集,是以我們也可以對 Table 裡的操作按此進行分類,大緻分為三類,如下圖所示:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

第一類,是跟 SQL 對齊的一些操作,比如 select, filter, join 等。第二類,是一些提升 Table API 易用性的操作。第三類,是增強 Table API 功能的一些操作。第一類操作由于和 SQL 類似,比較容易了解,其次,也可以檢視官方的文檔,了解具體的方法,是以這裡不再展開介紹。下面的章節會重點介紹後兩類操作,這些操作也是 Table API 獨有的。

4.2 提升易用性相關操作

介紹易用性之前,我們先來看一個問題。假設我們有一張很大的表,裡面有一百列,此時需要去掉一列,那麼SQL怎麼寫?我們需要 select 剩下的 99 列!顯然這會給使用者帶來不小的代價。為了解決這個問題,我們在Table上引入了一個 dropColumns 方法。利用 dropColumns 方法,我們便可以隻寫去掉的列。與此對應,還引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法,如下圖所示:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

解決了剛才的問題後,我們再看下面另一個問題:假設還是一張100列的表,我們需要選第20到第80列,那麼我們如何操作呢?為了解決這個問題,我們又引入了 withColumns 和 withoutColumns 方法。對于剛才的問題,我們可以簡單地寫成 table.select(“withColumns(20 to 80)”)。

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

4.3 增強功能相關操作

該小節會介紹下 TableAggregateFunction 的功能和用法。在引入 TableAggregateFunction 之前,Flink 裡有三種自定義函數:ScalarFunction,TableFunction 和 AggregateFunction。我們可以從輸入和輸出的次元對這些自定義函數進行分類。如下圖所示,ScalarFunction 是輸入一行,輸出一行;TableFunction 是輸入一行,輸出多行;AggregateFunction 是輸入多行輸出一行。為了讓語義更加完整,Table API 新加了 TableAggregateFunction,它可以接收和輸出多行。TableAggregateFunction 添加後,Table API 的功能可以得到很大的擴充,某種程度上可以用它來實作自定義 operator。比如,我們可以用 TableAggregateFunction 來實作 TopN。

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

TableAggregateFunction 使用也很簡單,方法簽名和用法如下圖所示:

Apache Flink 零基礎入門(七):Table API 程式設計一、什麼是 Table API二、Table API程式設計三、Table API 動态

用法上,我們隻需要調用 table.flatAggregate(),然後傳入一個 TableAggregateFunction 執行個體即可。使用者可以繼承 TableAggregateFunction 來實作自定義的函數。繼承的時候,需要先定義一個 Accumulator,用來存取狀态,此外自定義的 TableAggregateFunction 需要實作 accumulate 和 emitValue 方法。accumulate 方法用來處理輸入的資料,而 emitValue 方法負責根據 accumulator 裡的狀态輸出結果。

三、Table API 動态

最後介紹下 Table API 近期的動态:

1.Flip-29

主要是 Table API 功能和易用性的增強。比如剛剛介紹的 columns 相關操作,還有 TableAggregateFunction。

社群對應的 jira 是:

https://issues.apache.org/jira/browse/FLINK-10972

2.Python Table API

希望在 Table API 上增加 python 語言的支援。這個應該是 Python 使用者的福音。

https://issues.apache.org/jira/browse/FLINK-12308

3.Interactive Programming(互動式程式設計)

即 Table 上會提供一個 cache 算子,執行 cache 操作可以緩存 table 的結果,并在這個結果上做其他操作。社群對應 jira 是:

https://issues.apache.org/jira/browse/FLINK-11199

4.Iterative Processing(疊代計算)

Table 上會支援一個 iterator 的算子,該算子可以用來執行疊代計算。比如疊代 100 次,或者指定一個收斂的條件,在機器學習領域使用比較廣泛。社群對應 jira 是:

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,目前正在征集議題,限量早鳥票優惠ing。了解 Flink Forward Asia 2019 的更多資訊,請檢視:

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019