天天看點

FlinkSQL總結(1.12)FlinkSQL(1.12)

FlinkSQL(1.12)

一、基本文法

1.1、建表文法

create table 表名 (
字段名 字段類型,
...
) with (
連接配接器配置
)
           

1.2、時間語義

1.2.1、事件時間

使用:在設定完字段後最後一行進行指定。

格式:

watermark for 某時間字段名 AS 某時間字段名 - INTERVAL '某數字' SECOND

1.2.2、處理時間

使用:在設定完字段後最後一行進行指定。

格式:

随便起一個字段名 as proctime()

二、Source

2.1、Kafka

一般連接配接器配置如下即可,其他配置詳情見官網Apache Flink 1.12 Documentation: Apache Kafka SQL Connector

'connector' = 'kafka',
'topic' = 'topicName(自定義)',
'properties.bootstrap.servers' = 'ip:port,ip:port,ip:port(自定義)',
'properties.group.id' = 'groupId(自定義)',
'scan.startup.mode' = 'timestamp(可取其他值)',
'scan.startup.timestamp-millis' = '1662393600000(對應上述timestamp的模式)', -- 資料到達kafka的時間 2022-09-06 00:00:00
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
           

注意:

  • 普通的kafka不可以定義主鍵,會報錯,因為他沒有機制能保證語義上的主鍵唯一性。
  • 啟動位點scan.startup.mode,取值如下:
    • earliest-offset:從Kafka最早分區開始讀取。
    • latest-offset:從Kafka最新位點開始讀取。
    • group-offsets(預設值):根據Group讀取。
    • timestamp:從Kafka指定時間點讀取。配置該參數時,同時需要在WITH參數中指定scan.startup.timestamp-millis參數。(這個參數為毫秒機關的時間戳,這個時間是對應kafka中資料的時間,就是broker接受到這條消息的時間)
    • specific-offsets:從Kafka指定分區指定偏移量讀取。配置該參數時,同時需要在WITH參數中指定scan.startup.specific-offsets參數。
  • json解析問題:
    • json.fail-on-missing-field:如果為 true,則遇到缺失字段時,會讓作業失敗。如果為 false(預設值),則隻會把缺失字段設定為 null 并繼續處理。
    • json.ignore-parse-errors:如果為 true,則遇到解析異常時,會把這個字段設定為 null 并繼續處理。如果為 false(預設值),則會讓作業失敗。
    • 兩個參數不能同時為true,否則會抛異常

      Caused by: org.apache.flink.table.api.ValidationException: fail-on-missing-field and ignore-parse-errors shouldn't both be true.

      一般都是如上例子,一個true,一個false,表示如果資料解析異常則跳過這條資料,且如果解析沒問題,但是找不到某字段,則設定這個字段值為null。
  • key和value問題:
    • 如果除了value,我們還要解析key中的資料,則需要把key和value的format單獨設定,且需要額外設定一個配置’value.fields-include’ = ‘EXCEPT_KEY’,表示我們需要的字段,在value中有些沒有。預設是ALL,表示我們需要的字段,在value中都有。
    • 如果除了value,我們還要解析key中的資料,且key中的鍵值和value中的鍵值有重名的情況,此時還需要額外設定一個配置’key.fields-prefix’ = ‘key_’,‘key.fields’ = ‘field1;field2’。
  • 參數問題:
    • flinksql水位線問題,如果source為kafka,kafka的并行度大于1,但是flink的并行度為1,此時如果kafka中某個分區沒資料,這時候的flink的水位線一直不會觸發(如果用javaApi的方式實作,是不會有這個問題的!!!)這種情況需要通過參數調整水位線推進。table.exec.source.idle-timeout=10000,機關是ms,如果其他分區沒有等待多少ms後沒有資料來,則自動推進水位線。

完整例子如下:

CREATE TABLE pageviews (
key_user_id BIGINT,
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP(3), --yyyy-MM-dd HH:mm:ss
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '0' SECOND
 ) WITH (
'connector' = 'kafka',
'topic' = 'VIP-DT',
'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
'properties.group.id' = 'TestOpenSourceFlinkGroup',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1662393600000',
   
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.json.fail-on-missing-field' = 'false',
'key.fields-prefix' = 'key_',
'key.fields' = 'key_user_id',
   
'value.format' = 'json',
'value.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
);
           

2.2、Upsert-Kafka

一般連接配接器配置如下即可,相對于普通kafkaSource,他不能設定流開始的位點,以及他必須設定主鍵,主鍵就是對應的key值。(大部分的配置基本都和2.1kafka的配置相同)

'connector' = 'upsert-kafka',
'topic' = 'topicName(自定義)',
'properties.bootstrap.servers' = 'ip:port,ip:port,ip:port(自定義)',
'properties.group.id' = 'groupId(自定義)',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.json.fail-on-missing-field' = 'false',
'key.fields-prefix' = 'key_',

'value.format' = 'json',
'value.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
           

**注意1:**如果source為upsert-kafka,那麼意味着從source開始,這條流就是回撤流,我們可以對這條流進行簡單group by,但是不能進行開視窗。group by之後的資料需要用可支援upsert的sink進行接受。比如upsert-kafka,但是注意的是,upsert-kafka接受回撤流時,如果是删除的消息,他的value值為空(注意:沒有開視窗的group by可以對回撤流進行,也可以對普通append流進行,但是最終傳回的都是回撤流)

回撤流有以下規則:

  • +I對應新增的資料
  • -U對應回撤某條資料
  • +U對應更新後的資料
  • -D對應删除某條資料

當回撤流寫入到upsert-kafka中有以下規則:

  • -U的資料不會進入sink,
  • +I,+U,-D的資料會進入sink,但是-D的資料sink中會沒有value值

産生回撤流場景:

  • Aggregate Without Window(不帶 Window 的聚合場景)
  • Rank
  • Over Window
  • Left/Right/Full Outer Join

**注意2:**在flink1.11是不支援upsert-kafka的,如果source需要是回撤流,那麼kakfa中的資料格式需要是這幾個canal-json,debezium-json,maxwel-json,此時我們定義kafka source時,我們的format格式可以知道對應的canal-json,debezium-json,maxwel-json其中一種。相反如果kafka中的json對應的是以上三種格式之一,我們可以通過對應的format格式去接受。如果要進行資料的去重操作,則需要可以定義主鍵,且flink的參數中加上table.exec.source.cdc-events-duplicate=true,這時架構會生成一個額外的有狀态算子,使用該 primary key 來對變更事件去重并生成一個規範化的 changelog 流。

三、流處理場景

3.1、單流

3.1.1、簡單聚合

使用:

  • 和正常sql一樣對某些字段進行分組,然後求聚合值,隻不過會利用狀态存儲流過來的資料。注意:如果要進行簡單聚合,下遊必須支援upsert,否則會報錯

    doesn't support consuming update changes which is produced by node GroupAggregate

格式:

  • group by 字段名

3.1.2、視窗

使用:

  • group by後,與正常sql的group by一樣使用,隻不過不是對某字段group by,而是對一個函數進行group by。

格式:

  • group by tumble(時間字段,間隔時間)

    。當然也可以和其他正常字段一起使用,

    group by 某字段名, tumble(時間字段,間隔時間)

    。除了tumble,還有hop和session函數,分别是滾動、滑動、會話視窗。hop中有三個參數,前兩個和tumble一樣,第三個是一個時間參數,表示滑動間隔。

3.1.3、TopN

使用:

  • 求實時熱度等場景時使用,可以在回撤流和普通流上使用,傳回一個回撤流

格式:

  • 和傳統的開窗函數一樣,row_number() over(partition by 字段 order by 字段 desc)

注意:

  • 有一個bug,在使用時必須在外側套一層select,且必須有where條件,條件必須是rn<某數,或rn=某數,或rn<=某數[FLINK-26051]

3.1.4、視圖

使用:

  • 可以用,在寫一些複雜sql時,可以使用視圖來建立一些中間表,來使代碼看起來更易于了解。一個視圖内也可以查另一個視圖。

格式:

  • create view as select語句

3.2、雙流join

3.2.1、正常join(inner,left,right,full)

使用:select * from a inner/left/right/full join b on a.id = b.id;

傳回:Flink會通過狀态儲存兩條流的資料,最終會産生一條回撤流。

問題:狀态會越來越大,需要定期清除狀态。

為什麼是回撤流:

以 left Join 為例,且假設左流的資料比右流的資料先到,左流的資料會去掃描右流資料的狀态,如果找不到可以 Join 的資料,左流并不知道右流中是确實不存在這條資料還是說右流中的相應資料遲到了。為了滿足 left join 的語義的話,左邊流資料還是會産生一條 join 資料發送到下遊,類似于 MySQL Left Join,左流的字段以正常的表字段值填充,右流的相應字段以 Null 填充,然後輸出到下遊。

後期如果右流的相應資料到達,會去掃描左流的狀态再次進行 join,此時,為了保證語義的正确性,需要把前面已經輸出到下遊的這條特殊的資料進行回撤,同時會把最新 join 上的資料輸出到下遊。注意,對于相同的 Key,如果産生了一次回撤,是不會再産生第二次回撤的,因為如果後期再有該 Key 的資料到達,是可以 join 上另一條流上相應的資料的。

3.2.2、interval join

使用:在普通join的基礎上增加一些條件,①on後邊的關聯條件需要多一個時間關聯②on後邊的時間條件必須和事件時間的字段或者處理時間的時間字段相同。滿足這兩個條件才是interval join,否則就是普通join。我們可以在flink的webUI上看join的類型。

傳回:傳回一個普通追加流。

問題:需要自己把握設定一個視窗時間。

注意:interval隻支援innerjoin,不支援left,right,full join。

舉例:

前提:
	source1:es為事件時間或者處理時間
	source2:es為事件時間或者處理時間
語句1:
	select * from a,b where a.id=b.id and b.es between a.es and a.es + interval '5' second;
語句2:
	select * from a inner/left/right/full join b on a.id=b.id and/where b.es between a.es and 	a.es + interval '5' second;
join類型:
	都是interval join,而且interval join都是inner join,出來的流都是追加流。

           

驗證:去webui上看是否是interval join。在webui上也可以看到jointype,但是如果是interval join,他的jointype一定是inner join(如果你在insert語句中寫的是其他left/right/full,在webui上看見的也是left/right/full,但是實際上還是inner join,最終資料不會出現,左右兩邊有一邊為null的情況,輸出的還是一個append流。)

3.2.3、時态表join

定義:

  • 時态表就是一張随時間變化的表。

種類:

  • 一種是我們可以通路他的曆史版本,這種是版本表,比如回撤流;
  • 一種是我們隻能通路到目前最新的版本,這種是普通表,比如一些資料庫維表。

如何擷取版本表:(個人了解版本表就是一個帶有事件時間的回撤流)

  • 一種是建立kafka源表,且format格式為cdc格式(canal/maxwell/debezium),定義主鍵,定義事件時間。
  • 一種是建立upsert kafak源表,定義主鍵,定義事件時間。
  • 一種是通過視圖獲得,核心是轉化append流為retract流。首先append流一定得有事件事件,其次在建立視圖時候,通過row_number或者group by等操作傳回一條retract流。(如果上遊是kafka,且資料類型不是cdc類型,且我們需要指定資料的起始位點,這時候我們就要通過視圖來擷取到版本表)

官網案例:

  • -- source1
    CREATE TABLE orders  (
      order_id STRING,
      product_id STRING,
      order_time TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time  -- defines the necessary event time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-ODS_BUFFER_SHUNT',
      'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
      'properties.group.id' = 'TestOpenSourceFlinkGroup',
      'scan.startup.mode' = 'timestamp',
      'scan.startup.timestamp-millis' = '1663917600000',
      'value.format' = 'json',
      'value.json.ignore-parse-errors' = 'true',
      'value.json.fail-on-missing-field' = 'false'
    );
    -- sourcr2
    CREATE TABLE product_changelog   (
      product_id STRING,
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, -- 注意:自動從毫秒數轉為時間戳
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key constraint
      WATERMARK FOR update_time AS update_time   -- (2) defines the event time by watermark      
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-ODS_BUFFER_SHUNT2',
      'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
      'properties.group.id' = 'TestOpenSourceFlinkGroup',
      'scan.startup.mode' = 'timestamp',
      'scan.startup.timestamp-millis' = '1663917600000', --kafka的時間 2022-09-23 15:20:00
      'value.format' = 'debezium-json'
    );
    
    -- sink 
    create table printSink(
      order_id STRING,
      order_time TIMESTAMP(3),
      product_name STRING,
      product_time TIMESTAMP(3),
      price DECIMAL(10, 4)
    )with(
      'connector' = 'print'
    );
    
    -- 基于事件時間的時态表 Join
    insert into printSink 
    SELECT
      O.order_id,
      O.order_time,
      P.product_name,
      P.update_time AS product_time,
      P.product_price AS price
    FROM orders AS O
    LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
    ON O.product_id = P.product_id;
    
    
    -- source1對應資料
    {"order_id":"o_001","product_id":"111","order_time":"2022-09-23 00:01:00"}
    {"order_id":"o_002","product_id":"222","order_time":"2022-09-23 00:02:00"}
    {"order_id":"o_003","product_id":"111","order_time":"2022-09-23 12:00:00"}
    {"order_id":"o_004","product_id":"222","order_time":"2022-09-23 12:00:00"}
    {"order_id":"o_005","product_id":"111","order_time":"2022-09-23 18:00:00"}
    
    -- source2對應資料
    {"before":null,"after":{"product_id":"111","product_name":"scooter","product_price":11.11},"source":{},"op":"c","ts_ms":1663862460000,"transaction":null}
    
    {"before":null,"after":{"product_id":"222","product_name":"basketball","product_price":23.11},"source":{},"op":"c","ts_ms":1663862520000,"transaction":null}
    
    {"before":{"product_id":"111","product_name":"scooter","product_price":11.11},"after":{"product_id":"111","product_name":"scooter","product_price":12.99},"source":{},"op":"u","ts_ms":1663905600000,"transaction":null}
    
    {"before":{"product_id":"222","product_name":"basketball","product_price":23.11},"after":{"product_id":"222","product_name":"basketball","product_price":19.99},"source":{},"op":"u","ts_ms":1663905600000,"transaction":null}
    
    {"before":{"product_id":"111","product_name":"scooter","product_price":12.99},"after":null,"source":{},"op":"d","ts_ms":1663927200000,"transaction":null}
               

3.3、流表join

使用:當流資料需要關聯一些維表時,需要去對應資料庫異步對應的次元資訊,此時需要使用流表join。流表join,也是時态表join的一種,因為資料庫維表就相當于一個版本表,隻有一個最新的快照版本。

注意:流表需要用處理時間,進行join時,使用這個處理時間。

案例:

CREATE TABLE orders (
  order_id string,
  order_channel string,
  order_time  string,
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string,
  proctime as Proctime() --維表join需要用處理時間
--   WATERMARK FOR order_time AS order_time
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664344800000', --kafka的時間 2022-09-28 14:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);

--建立位址維表
create table area_info (
    area_id string, 
    area_province_name string,
    area_city_name string,
    area_county_name string, 
    area_street_name string, 
    region_name string 
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'area_info_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);

--根據位址維表生成詳細的包含位址的訂單資訊寬表
create table order_detail(
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string,
    area_province_name string,
    area_city_name string,
    area_county_name string,
    area_street_name string,
    region_name string
) with (
  'connector' = 'print'
);

insert into order_detail
    select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
           area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
           area.area_street_name, area.region_name  from orders 
           left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
           

四、Sink

4.1、Kafka

連接配接器配置可參考2.1,kafkaSource也可以看作是具體kafka中的資料,往flink内部流入的一個sink

4.2、Upset-Kafka

連接配接器配置可參考2.2

4.3、Mysql

使用:下遊是mysql時,我們可以實作資料的upsert/delete

案例1:

-- {"id":"1","name":"張三","age":18,"sex":"男","amount":20.56}
-- 上遊資料是append流,可以實作資料的update,需要定義主鍵,此主鍵可以和真實資料庫的主鍵不一樣。
CREATE TABLE kafka_source (
    id bigint, 
    name string,
    age int,
    sex string, 
    amount decimal(20,10)
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664348400000', --kafka的時間 2022-09-28 15:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);


create table mysql_sink (
    id bigint, 
    name string,
    age int,
    sex string, 
    amount decimal(20,10),
    PRIMARY KEY (name) NOT ENFORCED --真實資料庫主鍵為id,這裡可以不為id,如果可以確定某字段唯一,									  --也可以用此字段
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'user_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);


insert into mysql_sink
select * from kafka_source;
           

案例2:

-- 上遊資料是retreat流,可以實作資料的update/delete,需要定義主鍵,此主鍵可以和真實資料庫的主鍵不一樣。
CREATE TABLE kafka_source (
    id bigint,
    name string,
    age int,
    sex string,
    amount decimal(20,10)
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664348400000', --kafka的時間 2022-09-28 15:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);

create table mysql_sink (
    id bigint,
    name string,
    age int,
    sex string,
    amount decimal(20,10),
    PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'user_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);

insert into mysql_sink
select * from kafka_source;

-- 資料:
-- {"before":null,"after":{"id":"1","name":"張三","age":18,"sex":"男","amount":20.56},"source":{},"op":"c","ts_ms":1663862460000,"transaction":null}

-- {"before":{"id":"1","name":"張三","age":18,"sex":"男","amount":20.56},"after":{"id":"1","name":"張三","age":19,"sex":"男","amount":20.56},"source":{},"op":"u","ts_ms":1663862460000,"transaction":null}

-- {"before":{"id":"1","name":"張三","age":19,"sex":"男","amount":20.56},"after":null,"source":{},"op":"d","ts_ms":1663862460000,"transaction":null}
           

四、問題

4.1、水位線不推進

場景:source為kafka,kafka分區數大于1,flink的并行度為1,kafka某個分區沒資料。

解決:設定參數table.exec.source.idle-timeout=10000,機關是ms,如果其他分區等待xx毫秒沒資料,則推進水位線。(如果這個場景是javaAPI的方式對接kafka,則是不會出現的。)