天天看點

Flink 與 Hive 的磨合期

簡介: 在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可了解),Flink 源代碼本身對 Hive 1.1.0 版本相容性不好,存在不少問題。為了相容目前版本,筆者基于 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包并部署。

有不少讀者回報,參考上篇文章《Hive 終于等來了 Flink》部署 Flink 并內建 Hive 時,出現一些 bug 以及相容性等問題。雖已等來,卻未可用。是以筆者增加了這一篇文章,作為姊妹篇。

回顧

在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可了解),Flink 源代碼本身對 Hive 1.1.0 版本相容性不好,存在不少問題。為了相容目前版本,筆者基于 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包并部署。

其實經過很多開源項目的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決相容性的問題。對于筆者的環境來說,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰内容。

剪不斷理還亂的問題

根據讀者的回報,筆者将所有的問題總結為三類:

  1. Flink 如何連接配接 Hive 除了 API 外,有沒有類似 spark-sql 指令
  2. 識别不到 Hadoop 環境或配置檔案找不到
  3. 依賴包、類或方法找不到

1. Flink 如何連接配接 Hive

有的讀者不太清楚,如何配置 Flink 連接配接 Hive 的 Catalog,這裡補充一個完整的 conf/sql-client-hive.yaml 示例:

catalogs: - name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048

sql-client-hive.yaml 配置檔案裡面包含:

  1. Hive 配置檔案 catalogs 中配置了 Hive 的配置檔案路徑。
  2. Yarn 配置資訊 deployment 中配置了 Yarn 的配置資訊。
  3. 執行引擎資訊 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比較穩定,适合傳統的批處理作業,而且可以容錯,另外中間資料落盤,建議開啟壓縮功能。除了 batch,Flink 也支援 streaming 模式。

■ Flink SQL CLI 工具

類似 spark-sql 指令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。

sql-client.sh 使用方式如下:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. 識别不到 Hadoop 環境或配置檔案找不到

筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 用戶端,另外還需要配置一些環境變量,如下:

export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive export HIVE_CONF_DIR=/etc/hive/conf

3. 依賴包、類或方法找不到

先檢視一下 Flink 家目錄下的 lib 目錄:

$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.1.0-cdh5.16.2.jar ├── hive-metastore-1.1.0-cdh5.16.2.jar ├── libfb303-0.9.3.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar

如果上面前兩個問題都解決後,執行如下指令:

報錯,報錯,還是報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

其實在運作 sql-client.sh 腳本前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這裡筆者提示一個友善的方式,即設定 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)環境變量:

export HADOOP_CLASSPATH=`hadoop classpath`

再次執行:

很抱歉,繼續報錯:

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

這裡就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不相容性的問題了,解決方法是:

  1. 下載下傳 apache-hive-1.2.1 版本
  2. 替換 Flink lib 目錄下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次檢視 lib 目錄:

$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.2.1.jar ├── hive-metastore-1.2.1.jar ├── libfb303-0.9.2.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar

最後再執行:

這時,讀者就可以看到手握栗子的可愛小松鼠了。

Flink 與 Hive 的磨合期

Flink SQL CLI 實踐

在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社群對 SQL CLI 做了大量的改動,比如支援 View、支援更多的資料類型和 DDL 語句、支援分區讀寫、支援 INSERT OVERWRITE 等,實作了更多的 TableEnvironment API 的功能,更加友善使用者使用。

接下來,筆者詳細講解 Flink SQL CLI。

0. Help

執行下面指令,登入 Flink SQL 用戶端:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL>

執行 HELP,檢視 Flink SQL 支援的指令,如下為大部分常用的:

  • CREATE TABLE
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1. Hive 操作

■ 1.1 建立表和導入資料

為了友善讀者進行實驗,筆者使用 ssb-dbgen 生成測試資料,讀者也可以使用測試環境已有的資料來進行實驗。

具體如何在 Hive 中一鍵式建立表并插入資料,可以參考筆者早期的項目 https://github.com/MLikeWater/ssb-kylin。

■ 1.2 Hive 表

檢視上個步驟中建立的 Hive 表:

0: jdbc:hive2://http://xx.xxx.xxx.xxx:10000> show tables; +--------------+--+ | tab_name | +--------------+--+ | customer | | dates | | lineorder | | p_lineorder | | part | | supplier | +--------------+--+

讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。

2. Flink 操作

■ 2.1 通過 HiveCatalog 通路 Hive 資料庫

登入 Flink SQL CLI,并查詢 catalogs:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> show catalogs; default_catalog staginghive Flink SQL> use catalog staginghive;

通過 show catalogs 擷取配置的所有 catalog。由于筆者在 sql-client-hive.yaml 檔案中設定了預設的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。

■ 2.2 查詢 Hive 中繼資料

通過 Flink SQL 查詢 Hive 資料庫和表:

# 查詢資料庫 Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # 查詢表 Flink SQL> show tables; customer dates lineorder p_lineorder part supplier # 查詢表結構 Flink SQL> DESCRIBE customer; root |-- c_custkey: INT |-- c_name: STRING |-- c_address: STRING |-- c_city: STRING |-- c_nation: STRING |-- c_region: STRING |-- c_phone: STRING |-- c_mktsegment: STRING

這裡需要注意,Hive 的中繼資料在 Flink catalog 中都以小寫字母使用。

■ 2.3 查詢

接下來,在 Flink SQL CLI 中查詢一些 SQL 語句,完整 SQL 參考 https://github.com/MLikeWater/ssb-kylin 的 README。

目前 Flink SQL 解析 Hive 視圖中繼資料時,會遇到一些 Bug,比如執行 Q1.1 SQL:

Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?

Flink SQL 找不到視圖中的實體表。

p_lineorder 表是 Hive 中的一張視圖,建立表的語句如下:

CREATE VIEW P_LINEORDER AS SELECT LO_ORDERKEY, LO_LINENUMBER, LO_CUSTKEY, LO_PARTKEY, LO_SUPPKEY, LO_ORDERDATE, LO_ORDERPRIOTITY, LO_SHIPPRIOTITY, LO_QUANTITY, LO_EXTENDEDPRICE, LO_ORDTOTALPRICE, LO_DISCOUNT, LO_REVENUE, LO_SUPPLYCOST, LO_TAX, LO_COMMITDATE, LO_SHIPMODE, LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE FROM ssb.LINEORDER;

但是對于 Hive 中視圖的定義,Flink SQL 并沒有很好地進行中繼資料。為了後面 SQL 的順利執行,這裡筆者在 Hive 中删除并重建該視圖:

0: jdbc:hive2://http://xx.xxx.xxx.xxx:10000> create view p_lineorder as select lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey, lo_orderdate, lo_orderpriotity, lo_shippriotity, lo_quantity, lo_extendedprice, lo_ordtotalprice, lo_discount, lo_revenue, lo_supplycost, lo_tax, lo_commitdate, lo_shipmode, lo_extendedprice*lo_discount as v_revenue from ssb.lineorder;

然後繼續在 Flink SQL CLI 中查詢 Q1.1 SQL:

Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; revenue 894280292647

繼續查詢 Q2.1 SQL:

Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join part on lo_partkey = p_partkey > left join supplier on lo_suppkey = s_suppkey > where p_category = 'MFGR#12' and s_region = 'AMERICA' > group by d_year, p_brand > order by d_year, p_brand; lo_revenue d_year p_brand 819634128 1998 MFGR#1206 877651232 1998 MFGR#1207 754489428 1998 MFGR#1208 816369488 1998 MFGR#1209 668482306 1998 MFGR#1210 660366608 1998 MFGR#1211 862902570 1998 MFGR#1212 ...

最後再查詢一個 Q4.3 SQL:

Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join customer on lo_custkey = c_custkey > left join supplier on lo_suppkey = s_suppkey > left join part on lo_partkey = p_partkey > where c_region = 'AMERICA'and s_nation = 'UNITED STATES' > and (d_year = 1997 or d_year = 1998) > and p_category = 'MFGR#14' > group by d_year, s_city, p_brand > order by d_year, s_city, p_brand; d_year s_city p_brand profit 1998 UNITED ST9 MFGR#1440 6665681

如果讀者感興趣的話,可以查詢剩餘的 SQL,當然也可以和 Spark SQL 進行比較。另外 Flink SQL 也支援 EXPLAIN,查詢 SQL 的執行計劃。

■ 2.4 建立視圖

同樣,可以在 Flink SQL CLI 中建立和删除視圖,如下:

Flink SQL> create view p_lineorder2 as > select lo_orderkey, > lo_linenumber, > lo_custkey, > lo_partkey, > lo_suppkey, > lo_orderdate, > lo_orderpriotity, > lo_shippriotity, > lo_quantity, > lo_extendedprice, > lo_ordtotalprice, > lo_discount, > lo_revenue, > lo_supplycost, > lo_tax, > lo_commitdate, > lo_shipmode, > lo_extendedprice * lo_discount as v_revenue > from ssb.lineorder; [INFO] View has been created.

這裡筆者需要特别強調的是,目前 Flink 無法删除 Hive 中的視圖:

Flink SQL> drop view p_lineorder; [ERROR] Could not execute SQL statement. Reason: The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

■ 2.5 分區操作

Hive 資料庫中建立一張分區表:

CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string ) PARTITIONED BY (day string, type string) stored as textfile;

接着,通過 Flink SQL 插入和查詢資料:

# 插入靜态分區的資料 Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100001 Flink001 2020-02-01 Flink # 插入動态分區 Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # 動态和靜态分區結合使用類似,不再示範 # 覆寫插入資料 Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4'; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink

字段 day 在 Flink 屬于關鍵字,要特殊處理。

■ 2.6 其他功能

  • 2.6.1 函數
  • 2.6.2 設定參數
  • 目前隻支援 TextFile 存儲格式,還無法指定其他存儲格式

    隻支援 Hive 資料庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實作了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識别 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社群已經對這些存儲格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。

  • OpenCSVSerde 支援不完善

    如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正确識别字段類型,會把 Hive 表的字段全部映射為 String 類型。

  • 暫時不支援 Bucket 表
  • 暫時不支援 ACID 表
  • Flink SQL 優化方面功能較少
  • 權限控制方面

    這方面和 Spark SQL 類似,目前基于 HDFS ACL 控制,暫時還沒有實作 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基于 Ranger 設定 Spark SQL 和 Hive 共享通路權限的政策,實作行/列級控制以及審計資訊。