作者:孫金城
摘要:本文為 Flink 生産環境應用中的疑問剖析,Flink 無法實時寫入 MySQL 是初學者常見問題之一,由社群同學羅鵬程提出,Apache Flink PMC 孫金城(金竹)老師分享該問題的解決方案及分析思路。主要分為以下四部分:Tips:更多生産環境問題交流及回報請訂閱 Flink 中文郵件清單~
- 問題描述
- 解決思路
- 原因剖析
- 舉一反三
Flink 1.10 使用 flink-jdbc 連接配接器的方式與 MySQL 互動,讀資料和寫資料都能完成,但是在寫資料時,發現 Flink 程式執行完畢之後,才能在 MySQL 中查詢到插入的資料。即,雖然是流計算,但卻不能實時的輸出計算結果?
相關代碼片段:
JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/flink")
.setUsername("root")
.setPassword("123456")
.setParameterTypes(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
.setQuery("insert into batch_size values(?,?)")
.build()
如何解決?
Flink 1.10 這個問題是知道一秒鐘,不知磨洋工的 Case,在初學時候非常容易遇上,那麼真的是 Flink 不能實時寫入 MySQL 嗎?當然不是,上面代碼基礎之上簡單的加上一行,就解決問題了:
...
.setBatchSize(1) //将寫入MySQL的buffer大小為1。
..
那麼問題雖然解決了,根本原因是個啥呢?也許你看到這裡會說,這問題很明顯,就是 Flink 設計 JDBC Sink 的時候出于性能因素考慮,對寫入 buffer 做了預設值設定。
沒錯,這一點你說的很對,在 Flink 1.10 中 JDBC OutputFormat 的基類 AbstractJDBCOutputFormat 裡面和這相關的變量 DEFAULT_FLUSH_MAX_SIZE 預設值是 5000,是以在你學習測試時候由于測試資料少(少于 5000),資料一直在 buffer 中,直到資料源資料結束,作業也結束了,才将計算結果刷入 MySQL,是以沒有實時的(每條)寫入 MySQL。如下:
但這裡還有個因素需要注意,那就是時間因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 預設值是 0,這個相當于沒有時間限制,一直等到 buffer 滿了或者作業結束才能觸發寫出動作。
也就是有些初學者,發現問題,即使故意 debug 時候打上斷點,不讓作業結束,但是等到花兒都謝了,資料也沒有寫入到 MySQL。
在 Flink 1.10 中 AbstractJDBCOutputFormat 有兩個實作類:
分别對應了如下兩類 Sink:
是以在 Flink 1.10 中不論是 AppendTableSink 和 UpsertTableSink 都會有同樣的問題。不過 UpsertTableSink 時使用者可以設定時間,而 AppendTableSink 是連時間設定的入口都木有。
那麼,是 Flink 的鍋?
就這個問題而言,我個人認為不是使用者的問題,是 Flink 1.10 代碼設計有進一步改進的空間。在 Flink 1.11 中社群的确重構了,對 JDBCOutputFormat 打了 @Deprecated。感興趣可以查閱 FLINK-17537 了解變化過程。但是在這個改進中,并沒有對 DEFAULT_FLUSH_MAX_SIZE 預設值和 DEFAULT_FLUSH_INTERVAL_MILLS 預設值做變化,社群也在積極的讨論改進方案,想參與社群貢獻或者了解最終讨論結果的可以查閱 FLINK-16497。
當然在你學習過程中使用任何 Sink 的時候,隻要沒有實時寫入,都可以找找是否有寫出 buffer 和寫出時間的限制設定。在這一點上,羅鵬程也提到了 Elasticsearch 也有類似問題,需要調用 setBulkFlushMaxActions 進行設定。
大家在學習、使用 Flink 的過程中遇到的問題都可以通過 Flink 中文郵件清單進行回報,Flink 核心開發者及社群一線使用者線上答疑交流!
2 分鐘快速訂閱 Flink 中文郵件清單
Apache Flink 中文郵件清單訂閱流程:
- 發送任意郵件到 [email protected]
- 收到官方确認郵件
- 回複該郵件 confirm 即可訂閱
訂閱成功後将收到 Flink 官方的中文郵件清單的消息,您可以向 [email protected] 發郵件提問也可以幫助别人解答問題,動動手測試一下!
以上是對該問題解決方案及思路的分享,希望能對你有所幫助,也期待大家遇到的典型問題能及時回報至社群郵件清單。