聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。
《2021年最新版大資料面試題全面開啟更新》
Flink 1.12 版本
1. Hive 建表
//1、建立 Hive 資料庫
create database zhisheng;
//2、檢視建立的資料庫
show databases;
//3、使用建立的資料庫
use zhisheng;
//4、在該庫下建立 Hive 表
CREATE TABLE IF NOT EXISTS flink (
appid int,
message String
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
//5、往該表插入一條資料
insert into flink values(11111, '233sadadadwqqdq');
2.Flink 讀取 Hive 已經存在的表資料
//1、建立 Hive CATALOG,Flink 通過 catalog 不僅可以将自己的表寫入 Hive 的 metastore,也能讀寫 Hive 的表
CREATE CATALOG flinkHiveCatalog WITH (
'type' = 'hive',
'default-database' = 'zhisheng',
'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
);
//2、使用該 Catalog
USE CATALOG flinkHiveCatalog;
//3、因為剛才已經寫入了一條資料到 Hive 表(flink)
select * from flink;
3.Flink 往 Hive 中已經存在的表寫資料
//1、建立 Source 表
CREATE TABLE yarn_log_datagen_test_hive_sink (
appid INT,
message STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.appid.kind'='random',
'fields.appid.min'='1',
'fields.appid.max'='1000',
'fields.message.length'='100'
);
//2、将資料寫入到 Hive 表
insert into flink select * from yarn_log_datagen_test_hive_sink;
//再次查詢 Hive 表裡面的資料
select * from flink;
直接在 Hive 利用指令查詢:
4 .完整 Example
CREATE CATALOG flinkHiveCatalog WITH (
'type' = 'hive',
'default-database' = 'zhisheng',
'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
);
USE CATALOG flinkHiveCatalog;
SET table.sql-dialect=hive; -- 建立 Hive 表要指定 sql-dialect 為 Hive,否則建立的時候識别不了下面的 DDL 語句
CREATE TABLE yarn_logs (
appid INT,
message STRING
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.parallelism'='2' -- 該參數内部才支援設定并行度
);
SET table.sql-dialect=default; -- 建立 Flink 表又要換回預設的 sql-dialect,Flink 支援在同一個 SQL 裡面設定多個 sql-dialect
CREATE TABLE yarn_log_datagen_test (
appid INT,
message STRING,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.appid.kind' = 'random',
'fields.appid.min' = '1',
'fields.appid.max' = '1000',
'fields.message.length' = '100'
);
-- streaming sql, insert into hive table
INSERT INTO yarn_logs
SELECT appid, message, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM yarn_log_datagen_test;
-- batch sql, select with partition pruning
SELECT * FROM yarn_logs WHERE dt='2020-12-16' and hr='12';
show create table yarn_logs;