天天看點

Flink x Zeppelin ,Hive Streaming 實戰解析

作者:狄傑@蘑菇街

Flink 1.11 正式釋出已經三周了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久釋出了,是以就寫了一篇 Zeppelin 上的 Flink Hive Streaming 的實戰解析。本文主要從以下幾部分跟大家分享:

  • Hive Streaming 的意義
  • Checkpoint & Dependency
  • 寫入 Kafka
  • Hive Streaming Sink
  • Hive Streaming Source
  • Hive Temporal Table

很多同學可能會好奇,為什麼 Flink 1.11 中,Hive Streaming 的地位這麼高?它的出現,到底能給我們帶來什麼?

其實在大資料領域,一直存在兩種架構 Lambda 和 Kappa:

  • Lambda 架構——流批分離,靜态資料通過定時排程同步到 Hive 數倉,實時資料既會同步到 Hive,也會被實時計算引擎消費,這裡就引出了一點問題。
  • 資料口徑問題
  • 離線計算産出延時太大
  • 資料備援存儲
  • Kappa 架構——全部使用實時計算來産出資料,曆史資料通過回溯消息的消費位點計算,同樣也有很多的問題,畢竟沒有一勞永逸的架構。
  • 消息中間件無法保留全部曆史資料,同樣資料都是行式存儲,占用空間太大
  • 實時計算計算曆史資料力不從心
  • 無法進行 Ad-Hoc 的分析

為了解決這些問題,行業内推出了實時數倉,解決了大部分痛點,但是還是有些地方力不從心。比如涉及到曆史資料的計算怎麼辦?我想做 Ad-Hoc 的分析又怎麼玩?是以行業内現在都是實時數倉與離線數倉并行存在,而這又帶來了更多的問題:模型需要多份、資料産出不一緻、曆史資料的計算等等 。

而 Hive Streaming 的出現就可以解決這些問題!再也不用多套模型了;也不需要同一個名額因為涉及到曆史資料,寫一遍實時 SQL 再寫一遍離線 SQL;Ad-Hoc 也能做了,怎麼做?讀 Hive Streaming 産出的表就行!

接下來,讓我們從參數配置開始,接着流式的寫入 Hive,再到流式的讀取 Hive 表,最後再 Join 上 Hive 維表吧。這一整套流程都體驗後,想必大家對 Hive Streaming 一定會有更深入的了解,更能夠體會到它的作用。

因為隻有在完成 Checkpoint 之後,檔案才會從 In-progress 狀态變成 Finish 狀态,是以,我們需要合理的去配置 Checkpoint,在 Zeppelin 中配置 Checkpoint 很簡單。

%flink.conf

# checkpoint 配置

pipeline.time-characteristic EventTime
execution.checkpointing.interval 120000
execution.checkpointing.min-pause 60000
execution.checkpointing.timeout 60000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION

# 依賴jar包配置

flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0           

又因為我們需要從 Kafka 中讀取資料,是以将 Kafka 的依賴也加入進去了。

寫入Kafka

我們的資料來自于天池資料集,是以 CSV 的格式存在于本地磁盤,是以需要先将他們寫入 Kafka。

先建一下 CSV Source 和 Kafka Sink 的表:

%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS source_csv;
CREATE TABLE source_csv (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',
 'format' = 'csv'
 
 )           
%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string,
ts AS localtimestamp,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'theme_click_log',
'properties.bootstrap.servers' = '10.70.98.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'

)           

因為注冊的表即可以讀又可以寫,于是我在建表時将 Watermark 加上了;又因為源資料中的時間戳已經很老了,是以我這裡采用目前時間減去5秒作為我的 Watermark。

大家可以看到,我在語句一開始指定了 SQL 方言為 Default,這是為啥呢?還有别的方言嗎?别急,聽我慢慢說。

其實在之前的版本,Flink 就已經可以和 Hive 打通,包括可以把表建在 Hive 上,但是很多文法和 Hive 不相容,包括建的表在 Hive 中也無法檢視,主要原因就是方言不相容。是以,在 Flink 1.11 中,為了減少學習成本(文法不相容),可以用 DDL 建 Hive 表并在 Hive 中查詢,Flink 支援了方言,預設的就是 Default 了,就和之前一樣,如果想建 Hive 表,并支援查詢,請使用 Hive 方言,具體可以參考下方連結。

Hive 方言: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html

再把資料從 CSV 中讀取後寫入 Kafka。

%flink.ssql(type=update)

insert into kafka_table select * from source_csv ;           

再瞄一眼 Kafka,看看資料有沒有被灌進去:

Flink x Zeppelin ,Hive Streaming 實戰解析

看來沒問題,那麼接下來讓我們寫入 Hive。

建一個Hive Sink Table,記得将方言切換到 Hive,否則會有問題。

%flink.ssql
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS hive_table;
CREATE TABLE hive_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (

 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 min',
 'sink.partition-commit.policy.kind'='metastore,success-file'

);           

參數給大家稍微解釋一下:

  • partition.time-extractor.timestamp-pattern:分區時間抽取器,與 DDL 中的分區字段保持一緻;
  • sink.partition-commit.trigger:分區觸發器類型,可選 process-time 或partition-time。process-time:不需要上面的參數,也不需要水印,當目前時間大于分區建立時間 +sink.partition-commit.delay 中定義的時間,送出分區;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分區時間 +sink.partition-commit.delay 中定義的時間,送出分區;
  • sink.partition-commit.delay:相當于延時時間;
  • sink.partition-commit.policy.kind:怎麼送出,一般送出成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分區的資料;如果需要合并小檔案,也可以自定義 Class,通過實作 PartitionCommitPolicy 接口。

接下來讓我們把資料插入剛剛建立的 Hive Table:

%flink.ssql

insert into hive_table select  user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table           

讓程式再跑一會兒~我們先去倒一杯 95 年的 Java☕️ 。

然後再看看我們的 HDFS,看看路徑下的東西。

Flink x Zeppelin ,Hive Streaming 實戰解析

大家也可以用 Hive 自行查詢看看,我呢就先賣個關子,一會兒用 Hive Streaming 來讀資料。

因為 Hive 表上面已經建立過了,是以這邊讀資料的時候直接拿來用就行了,不同的地方是需要使用 Table Hints 去覆寫參數。

Hive Streaming Source 最大的不足是,無法讀取已經讀取過的分區下新增的檔案。簡單來說就是,讀過的分區,就不會再讀了。看似很坑,不過仔細想想,這樣才符合流的特性。

照舊給大家說一下參數的意思:

  • stream-source.enable:顯而易見,表示是否開啟流模式。
  • stream-source.monitor-interval:監控新檔案/分區産生的間隔。
  • stream-source.consume-order:可以選 create-time 或者 partition-time;create-time 指的不是分區建立時間,而是在 HDFS 中檔案/檔案夾的建立時間;partition-time 指的是分區的時間;對于非分區表,隻能用 create-time。官網這邊的介紹寫的有點模糊,會讓人誤以為可以查到已經讀過的分區下新增的檔案,其實經過我的測試和翻看源碼發現并不能。
  • stream-source.consume-start-offset:表示從哪個分區開始讀。

光說不幹假把式,讓我們撈一把資料看看~

Flink x Zeppelin ,Hive Streaming 實戰解析

SET 那一行得帶着,不然無法使用 Table Hints。

看完了 Streaming Source 和 Streaming Sink,讓我們最後再試一下 Hive 作為維表吧。

其實用 Hive 維表很簡單,隻要是在 Hive 中存在的表,都可以當做維表使用,參數完全可以用 Table Hints 來覆寫。

  • lookup.join.cache.ttl:表示緩存時間;這裡值得注意的是,因為 Hive 維表會把維表所有資料緩存在 TM 的記憶體中,如果維表量很大,那麼很容易就 OOM;如果 ttl 時間太短,那麼會頻繁的加載資料,性能會有很大影響。
Flink x Zeppelin ,Hive Streaming 實戰解析

因為是 LEFT JOIN,是以維表中不存在的資料會以 NULL 補全。

再看一眼 DAG 圖:

Flink x Zeppelin ,Hive Streaming 實戰解析

大家看一下畫框的地方,能看到這邊是使用的維表關聯 LookupJoin。

如果大家 SQL 語句寫錯了,丢了 for system_time as of a.p,那麼 DAG 圖就會變成這樣:

Flink x Zeppelin ,Hive Streaming 實戰解析

這種就不是維表 JOIN 其實更像是流和批在 JOIN。

寫在最後

Hive Streaming 的完善意味着打通了流批一體的最後一道壁壘,既可以做到曆史資料的 OLAP 分析,又可以實時吐出結果,這無疑是 ETL 開發者的福音,想必接下來的日子,會有更多的企業完成他們實時數倉的建設。

參考文檔:

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/

[2]

https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md

Note 下載下傳:

https://github.com/lonelyGhostisdog/flinksql/blob/master/src/main/resources/Flink%20on%20Zeppelin/Hive%20Streaming%20Test.zpln

最後,給大家介紹一下 Flink on Zeppelin 的釘釘群,大家有問題可以在裡面讨論,Apache Zeppelin PMC 簡鋒大佬也在裡面,有問題可以直接在釘群中提問交流~

作者介紹:

狄傑,蘑菇街資深資料專家,負責蘑菇街實時計算平台 。目前 Focus 在 Flink on Zeppelin,Apache Zeppelin Contributor。