天天看點

Flink 1.11 與 Hive 批流一體數倉實踐

導讀:Flink 從 1.9.0 開始提供與 Hive 內建的功能,随着幾個版本的疊代,在最新的 Flink 1.11 中,與 Hive 內建的功能進一步深化,并且開始嘗試将流計算場景與Hive 進行整合。

本文主要分享在 Flink 1.11 中對接 Hive 的新特性,以及如何利用 Flink 對 Hive 數倉進行實時化改造,進而實作批流一體的目标。主要内容包括:

· Flink 與 Hive 內建的背景介紹

· Flink 1.11中的新特性

· 打造 Hive 批流一體數倉

一、 Flink 與 Hive 內建背景

為什麼要做 Flink 和 Hive 內建的功能呢?最早的初衷是我們希望挖掘 Flink 在批處理方面的能力。衆所周知,Flink 在流計算方面已經是成功的引擎了,使用的使用者也非常多。在 Flink 的設計理念當中,批計算是流進行中的一個特例。也就意味着,如果 Flink 在流計算方面做好,其實它的架構也能很好的支援批計算的場景。在批計算的場景中,SQL 是一個很重要的切入點。因為做資料分析的同學,他們更習慣使用SQL 進行開發,而不是去寫 DataStream 或者 DataSet 這樣的程式。

Hadoop 生态圈的 SQL 引擎,Hive 是一個事實上的标準。大部分的使用者環境中都會使用到了 Hive 的一些功能,來搭建數倉。一些比較新的 SQL 的引擎,例如 Spark SQL、Impala ,它們其實都提供了與 Hive 內建的能力。為了友善的能夠對接上目前使用者已有的使用場景,是以我們認為對 Flink 而言,對接 Hive 也是不可缺少的功能。

是以,我們在 Flink 1.9 當中,就開始提供了與 Hive 內建的功能。當然在 1.9 版本裡面,這個功能是作為試用版釋出的。到了 Flink 1.10 版本,與 Hive 內建的功能就達到了生産可用。同時在 Flink 1.10 釋出的時候,我們用 10TB 的 TPC-DS 測試集,對 Flink 和 Hive on MapReduce 進行了對比,對比結果如下:

Flink 1.11 與 Hive 批流一體數倉實踐

藍色的方框表示 Flink 用的時間,桔紅色的方框表示 Hive on MapReduce 用的時間。最終的結果是 Flink 對于 Hive on MapReduce 大概提升了 7 倍左右的性能。是以驗證了 Flink SQL 可以很好的支援批計算的場景。

接下來介紹下 Flink 對接 Hive 的設計架構。對接 Hive 的時候需要幾個層面,分别是:

· 能夠通路 Hive 的中繼資料;

· 讀寫 Hive 表資料;

· Production Ready ;

1. 通路 Hive 中繼資料

使用過 Hive 的同學應該都知道,Hive 的中繼資料是通過 Hive Metastore 來管理的。是以意味着 Flink 需要打通與 Hive Metastore 的通信。為了更好的通路 Hive 中繼資料,在 Flink 這邊是提出了一套全新設計的 Catalog API 。

Flink 1.11 與 Hive 批流一體數倉實踐

這個全新的接口是一個通用化的設計。它并不隻是為了對接 Hive 中繼資料,理論上是它可以對接不同外部系統的中繼資料。

而且在一個 Flink Session 當中,是可以建立多個 Catalog ,每一個 Catalog 對應于一個外部系統。使用者可以在 Flink Table API 或者如果使用的是 SQL Client 的話,可以在 Yaml 檔案裡指定定義哪些 Catalog 。然後在 SQL Client 建立 TableEnvironment 的時候,就會把這些 Catalog 加載起來。TableEnvironment 通過CatalogManager 來管理這些不同的 Catalog 的執行個體。這樣 SQL Client 在後續的送出 SQL 語句的過程中,就可以使用這些 Catalog 去通路外部系統的中繼資料了。

上面這張圖裡列出了 2 個 Catalog 的實作。一個是 GenericlnMemoryCatalog ,把所有的中繼資料都儲存在 Flink Client 端的記憶體裡。它的行為是類似于 Catalog 接口出現之前 Flink 的行為。也就是所有的中繼資料的生命周期跟 SQL Client 的 Session 周期是一樣的。當 Session 結束,在 Session 裡面建立的中繼資料也就自動的丢失了。

另一個是對接 Hive 着重介紹的 HiveCatalog 。HiveCatalog 背後對接的是 Hive Metastore 的執行個體,要與 Hive Metastore 進行通信來做中繼資料的讀寫。為了支援多個版本的 Hive,不同版本的 Hive Metastore 的API可能存在不相容。是以在 HiveCatalog 和 Hive Metastore 之間又加了一個 HiveShim ,通過 HiveShim 可以支援不同版本的 Hive 。

這裡的 HiveCatalog 一方面可以讓 Flink 去通路 Hive 自身有的中繼資料,另一方面它也為 Flink 提供了持久化中繼資料的能力。也就是 HiveCatalog 既可以用來存儲 Hive的中繼資料,也可以存 Flink 使用的中繼資料。例如,在 Flink 中建立一張 Kafka 的表,那麼這張表也是可以存到 HiveCatalog 裡的。這樣也就是為 Flink 提供了持久化中繼資料的能力。在沒有 HiveCatalog 之前,是沒有持久化能力的。

2. 讀寫 Hive 表資料

有了通路 Hive 中繼資料的能力後,另一個重要的方面是讀寫 Hive 表資料。Hive 的表是存在 Hadoop 的 file system 裡的,這個 file system 是一個 HDFS ,也可能是其他檔案系統。隻要是實作了 Hadoop 的 file system 接口的,理論上都可以存儲Hive 的表。

在 Flink 當中:

· 讀資料時實作了 HiveTableSource

· 寫資料時實作了 HiveTableSink

而且設計的一個原則是:希望盡可能去複用 Hive 原有的 Input/Output Format、SerDe 等,來讀寫 Hive 的資料。這樣做的好處主要是 2 點,一個是複用可以減少開發的工作量。另一個是複用好處是盡可能與 Hive 保證寫入資料的相容性。目标是Flink 寫入的資料,Hive 必須可以正常的讀取。反之, Hive 寫入的資料,Flink 也可以正常讀取。

3. Production Ready

在 Flink 1.10 中,對接 Hive 的功能已經實作了 Production Ready 。實作 Production Ready 主要是認為在功能上已經完備了。具體實作的功能如下:

Flink 1.11 與 Hive 批流一體數倉實踐

二、Flink 1.11 中的新特性

下面将介紹下,在 Flink 1.11 版本中,對接 Hive 的一些新特性。

1. 簡化的依賴管理

首先做的是簡化使用 Hive connector 的依賴管理。Hive connector 的一個痛點是需要添加若幹個 jar 包的依賴,而且使用的 Hive 版本的不同,所需添加的 jar 包就不同。例如下圖:

Flink 1.11 與 Hive 批流一體數倉實踐
Flink 1.11 與 Hive 批流一體數倉實踐

第一張圖是使用的 Hive 1.0.0 版本需要添加的 jar 包。第二張圖是用 Hive 2.2.0 版本需要添加的 jar 包。可以看出,不管是從 jar 包的個數、版本等,不同 Hive 版本添加的 jar 包是不一樣的。是以如果不仔細去讀文檔的話,就很容易導緻使用者添加的依賴錯誤。一旦添加錯誤,例如添加少了或者版本不對,那麼會報出來一些比較奇怪、難了解的錯誤。這也是使用者在使用 Hive connector 時暴露最多的問題之一。

是以我們希望能簡化依賴管理,給使用者提供更好的體驗。具體的做法是,在 Flink 1.11 版本中開始,會提供一些預先打好的 Hive 依賴包:

Flink 1.11 與 Hive 批流一體數倉實踐

使用者可以根據自己的 Hive 版本,選擇對應的依賴包就可以了。

如果使用者使用的 Hive 并不是開源版本的 Hive ,使用者還是可以使用 1.10 那種方式,去自己添加單個 jar 包。

2. Hive Dialect 的增強

在 Flink 1.10 就引入了 Hive Dialect ,但是很少有人使用,因為這個版本的 Hive Dialect 功能比較弱。僅僅的一個功能是:是否允許建立分區表的開關。就是如果設定了 Hive Dialect ,那就可以在 Flink SQL 中建立分區表。如果沒設定,則不允許建立。

另一個關鍵的是它不提供 Hive 文法的相容。如果設定了 Hive Dialect 并可以建立分區表,但是建立分區表的 DDL 并不是 Hive 的文法。

在 Flink 1.11 中着重對 Hive Dialect 的功能進行了增強。增強的目标是:希望使用者在使用 Flink SQL Client 的時候,能夠獲得與使用 Hive CLI 或 Beeline 近似的使用體驗。就是在使用 Flink SQL Client 中,可以去寫一些 Hive 特定的一些文法。或者說使用者在遷移至 Flink 的時候, Hive 的腳本可以完全不用修改。

為了實作上述目标,在 Flink 1.11 中做了如下改進:

·給 Dialect 做了參數化,目前參數支援 default 和 hive 兩種值。default 是Flink 自身的 Dialect ,hive 是 Hive 的 Dialect。

· SQL Client 和 API 均可以使用。

· 可以靈活的做動态切換,切換是語句級别的。例如 Session 建立後,第一個語句想用 Flink 的 Dialect 來寫,就設定成 default 。在執行了幾行語句後,想用 Hive 的 Dialect 來寫,就可以設定成 hive 。在切換時,就不需要重新開機 Session。

· 相容 Hive 常用 DDL 以及基礎的 DML。

· 提供與 Hive CLI 或 Beeline 近似的使用體驗。

3. 開啟 Hive Dialect

Flink 1.11 與 Hive 批流一體數倉實踐

上圖是在 SQL Client 中開啟 Hive Dialect 的方法。在 SQL Client 中可以設定初始的 Dialect。可以在 Yaml 檔案裡設定,也可以在 SQL Client 起來後,進行動态的切換。

還可以通過 Flink Table API 的方式開啟 Hive Dialect :

Flink 1.11 與 Hive 批流一體數倉實踐

可以看到通過 TableEnvironment 去擷取 Config 然後設定開啟。

4. Hive Dialect 支援的文法

Hive Dialect 的文法主要是在 DDL 方面進行了增強。因為在 1.10 中通過 Flink SQL寫 DDL 去操作 Hive 的中繼資料不是十分可用,是以要解決這個痛點,将主要精力集中在 DDL 方向了。

目前所支援的 DDL 如下:

Flink 1.11 與 Hive 批流一體數倉實踐
Flink 1.11 與 Hive 批流一體數倉實踐

5. 流式資料寫入Hive

在 Flink 1.11 中還做了流式資料場景,以及跟 Hive 相結合的功能,通過 Flink 與Hive 的結合,來幫助 Hive 數倉進行實時化的改造。

Flink 1.11 與 Hive 批流一體數倉實踐

流式資料寫入 Hive 是借助 Streaming File Sink 實作的,它是完全 SQL 化的,不需要使用者進行代碼開發。流式資料寫入 Hive 也支援分區和非分區表。Hive 數倉一般都是離線資料,使用者對資料一緻性要求比較高,是以支援 Exactly-Once 語義。流式資料寫 Hive 大概有 5-10 分鐘級别的延遲。如果希望延遲盡可能的低,那麼産生的一個結果就是會生成更多的小檔案。小檔案對 HDFS 來說是不友好的,小檔案多了以後,會影響 HDFS 的性能。這種情況下可以做一些小文的合并操作。

流式資料寫入 Hive 需要有幾個配置的地方:

Flink 1.11 與 Hive 批流一體數倉實踐

對于分區表來說,要設定 Partition Commit Delay 的參數。這個參數的意義就是控制每個分區包含多長時間的資料,例如可設定成天、小時等。

Partition Commit Trigger 表示 Partition Commit 什麼時候觸發,在 1.11 版本中支援 Process-time 和 Partition-time 觸發機制。

Partition Commit Policy 表示用什麼方式送出分區。對于 Hive 來說,是需要将分區送出到 metastore, 這樣分區才是可見的。metastore 政策隻支援 Hive 表。還有一個是 success-file 方式,success-file 是告訴下遊的作業分區的資料已經準備好了。使用者也可以自定義,自己去實作一個送出方式。另外 Policy 可以指定多個的,例如可以同時指定 metastore 和 success-file。

下面看下流式資料寫入Hive的實作原理:

Flink 1.11 與 Hive 批流一體數倉實踐

主要是兩個部分,一個是 StreamingFileWriter ,借助它實作資料的寫入,它會區分 Bucket,這裡的 Buck 類似 Hive 的分區概念,每個 Subtask 都會往不同的 Bucket去寫資料。每個 Subtask 寫的 Bucket 同一個時間可能會維持 3 種檔案,In-progress Files 表示正在寫的檔案,Pending Files 表示檔案已經寫完了但是還沒有送出,Finished Files 表示檔案已經寫完并且也已經送出了。

另一個是 StreamingFileCommitter,在 StreamingFileWriter 後執行。它是用來送出分區的,是以對于非分區表就不需要它了。當 StreamingFileWriter 的一個分區資料準備好後,StreamingFileWriter 會向 StreamingFileCommitter 發一個 Commit Message,Commit Message 告訴 StreamingFileCommitter 那些資料已經準備好了的。然後進行送出的觸發 Commit Trigger,以及送出方式 Commit Policy。

下面是一個具體的例子:

Flink 1.11 與 Hive 批流一體數倉實踐

例子中建立了一個叫 hive_table 的分區表,它有兩個分區 dt 和 hour。dt 代表的是日期的字元串,hour 代表小時的字元串。Commit trigger 設定的是 partition-time,Commit delay 設定的是1小時,Commit Policy 設定的是 metastore 和success-file。

6. 流式消費 Hive

在 Flink 1.10 中讀 Hive 資料的方式是批的方式去讀的,從 1.11 版本中,提供了流式的去讀 Hive 資料。

Flink 1.11 與 Hive 批流一體數倉實踐

通過不斷的監控 Hive 資料表有沒有新資料,有的話就進行增量資料的消費。

如果要針對某一張 Hive 表開啟流式消費,可以在 table property 中開啟,或者也可以使用在 1.11 中新加的 dynamic options 功能,可以查詢的時候動态的指定 Hive 表是否需要打開流式讀取。

流式消費 Hive 支援分區表和非分區表。對于非分區表會監控表目錄下新檔案添加,并增量讀取。對于分區表通過監控分區目錄和 Metastore 的方式确認是否有新分區添加,如果有新增分區,就會把新增分區資料讀取出來。這裡需要注意,讀新增分區資料是一次性的。也就是新增加分區後,會把這個分區資料一次性都讀出來,在這之後就不再監控這個分區的資料了。是以如果需要用 Flink 流式消費 Hive 的分區表,那應該保證分區在添加的時候它的資料是完整的。

Flink 1.11 與 Hive 批流一體數倉實踐

流式消費 Hive 資料也需要額外的指定一些參數。首先要指定消費順序,因為資料是增量讀取,是以需要指定要用什麼順序消費資料,目前支援兩種消費順序 create-time 和 partition-time。

使用者還可以指定消費起點,類似于消費 kafka 指定 offset 這樣的功能,希望從哪個時間點的資料開始消費。Flink 去消費資料的時候,就會檢查并隻會讀取這個時間點之後的資料。

最後還可以指定監控的間隔。因為目前監控新資料的添加都是要掃描檔案系統的,可能你希望監控的不要太頻繁,太頻繁會給檔案系統造成比較大的壓力。是以可以控制一個間隔。

最後看下流式消費的原理。先看流式消費非分區表:

Flink 1.11 與 Hive 批流一體數倉實踐

圖中 ContinuoousFileMonitoringFunction 會不斷監控非分區表目錄下面的檔案,會不斷的跟檔案系統進行互動。一旦發現有新的檔案添加了,就會對這些檔案生成Splits ,并将 Splits 傳到 ContinuoousFileReaderOperator,FileReaderOperator 拿到 Splits 後就會到檔案系統中實際的消費這些資料,然後把讀出來的資料再傳往下遊處理。

Flink 1.11 與 Hive 批流一體數倉實踐

對于流式消費分區表和非分區表差別不是很大,其中 HiveContinuousMonitoringFunction 也會去不斷的掃描檔案系統,但是它掃描的是新增分區的目錄。當它發現有新增的分區目錄後,會進一步到 metstore 中做核查,檢視是否這個分區已經送出到 metstore 中。如果已經送出,那就可以消費分區中的資料了。然後會把分區中的資料生成 Splits 傳給 ContinuousFileReaderOperator ,然後就可以對資料進行消費了。

7. 關聯 Hive 維表

關于 Hive 跟流式資料結合的另一個場景就是:關聯 Hive 維表。例如在消費流式資料時,與一張線下的 Hive 維表進行 join。

Flink 1.11 與 Hive 批流一體數倉實踐

關聯Hive維表采用了 Flink 的 Temporal Table 的文法,就是把 Hive 的維表作為Temporal Table,然後與流式的表進行 join。想了解更多關于 Temporal Table 的内容,可檢視 Flink 的官網。

關聯 Hive 維表的實作是每個 sub-task 将 Hive 表緩存在記憶體中,是緩存整張的Hive 表。如果 Hive 維表大小超過 sub-task 的可用記憶體,那麼作業會失敗。

Hive 維表在關聯的時候,Hive 維表可能會發生更新,是以會允許使用者設定 hive 表緩存的逾時時間。超過這個時間後,sub-task 會重新加載 Hive 維表。需要注意,這種場景不适用于 Hive 維表頻繁更新的情況,這樣會對 HDFS 檔案系統造成很大的壓力。是以适用于 Hive 維表緩慢更新的情況。緩存逾時時間一般設定的比較長,一般是小時級别的。

Flink 1.11 與 Hive 批流一體數倉實踐

這張圖表示的是關聯 Hive 維表的原理。Streaming Data 代表流式資料,LookupJoinRunner 表示 Join 算子,它會拿到流式資料的 join key,并把 join key 傳給FileSystemLookupFunction。

FileSystemLookupFunction 是 一個Table function,它會去跟底層的檔案系統互動并加載 Hive 表,然後在 Hive 表中查詢 join key,判斷哪些行資料是可以 join的。

下面是關聯 Hive 維表的例子:

Flink 1.11 與 Hive 批流一體數倉實踐

這是 Flink 官網的一個例子,流式表是 Orders,LatestTates 是 Hive 的維表。

三、Hive 批流一體數倉

經過上面的介紹可以看出,在 Flink 1.11 中,在 Hive 數倉和批流一體的功能是進行了着重的開發。因為 Flink 是一個流處理的引擎,希望幫使用者更好的将批和流結合,讓 Hive 數倉實作實時化的改造,讓使用者更友善的挖掘資料的價值。

Flink 1.11 與 Hive 批流一體數倉實踐

在 Flink 1.11 之前,Flink 對接 Hive 會做些批處理的計算,并且隻支援離線的場景。離線的場景一個問題是延遲比較大,批作業的排程一般都會通過一些排程的架構去排程。這樣其實延遲會有累加的作用。例如第一個 job 跑完,才能去跑第二個 job...這樣依次執行。是以端對端的延遲就是所有 job 的疊加。

Flink 1.11 與 Hive 批流一體數倉實踐

到了 1.11 之後,支援了 Hive 的流式處理的能力,就可以對 Hive 數倉進行一個實時化的改造。

例如 Online 的一些資料,用 Flink 做 ETL,去實時的寫入 Hive。當資料寫入 Hive之後,可以進一步接一個新的 Flink job,來做實時的查詢或者近似實時的查詢,可以很快的傳回結果。同時,其他的 Flink job 還可以利用寫入 Hive 數倉的資料作為維表,來跟其它線上的資料進行關聯整合,來得到分析的結果。

作者介紹:

李銳,阿裡花名"天離",阿裡巴巴技術專家,Apache Hive PMC 成員,加入阿裡巴巴之前曾就職于 Intel、IBM 等公司,主要參與 Hive、HDFS、Spark 等開源項目。