問題
Flink實時統計GMV,如果訂單金額下午變了該怎麼處理
具體描述
- 實時統計每天的GMV,但是訂單金額是會修改的。
- 訂單存儲在mysql,通過binlog解析工具實時同步到kafka.然後從kafka實時統計當日訂單總額。
- 假設訂單009 上午10點生成,金額為1000. 生成一條json資料到kafka ,GMV實時統計為1000。
- 然後下午15點,009訂單金額被修改為500。資料生成json也會進入kafka. 這時如果不減去上午已經統計的金額。那麼總金額就是錯的。
根據 update /delete 要寫這個減去的邏輯。
按日去重是不行了,因為是增量處理, 上午的資料已經被處理了不能再擷取了。
解決思路
-
首先版本是1.11+, 可以直接用binlog
format,這樣資料的修改其實會自動對應到update_before和update_after的資料,這樣Flink
内部的算子都可以處理好這種資料,包括聚合算子。比如你是select sum(xxx) from T group by
yyy這種,那這個sum名額會自動做好這件事。
-
如果不用binlog模式,隻是取最新的資料來做聚合計算,也可以用去重算子[1] 将append資料流轉成retract資料流,這樣下遊再用同樣的
聚合邏輯,效果也是一樣的。
去重文法:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
- ROW_NUMBER(): 每一行配置設定一個唯一的,序列數字,從1開始
- PARTITION BY col1[, col2...]: 指定分區列 i.e. 去重key.
- ORDER BY time_attr [asc|desc]: 指定排序字段, 必須是一個時間屬性. Currently Flink 支援 processing time 和 event time 屬性. Ordering by ASC 意為保留第一行, ordering by DESC 意為 保留最後一行.
- WHERE rownum = 1: The rownum = 1 是必須的,對于Flink識别這個是去重的查詢語句
隻要source端産生了changelog資料,後面的算子是可以自動處理update消息的,簡單了解,你可以認為:
- append / update_after 消息會累加到聚合名額上
- delete / update_before 消息會從聚合名額上進行retract
Reference
- https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/deduplication/
- https://developer.aliyun.com/article/782653