天天看點

為什麼 Flink 無法實時寫入 MySQL?

作者:孫金城

摘要:本文為 Flink 生産環境應用中的疑問剖析,Flink 無法實時寫入 MySQL 是初學者常見問題之一,由社群同學羅鵬程提出,Apache Flink PMC 孫金城(金竹)老師分享該問題的解決方案及分析思路。主要分為以下四部分:
  1. 問題描述
  2. 解決思路
  3. 原因剖析
  4. 舉一反三
Tips:更多生産環境問題交流及回報請訂閱 Flink 中文郵件清單~

Flink 1.10 使用 flink-jdbc 連接配接器的方式與 MySQL 互動,讀資料和寫資料都能完成,但是在寫資料時,發現 Flink 程式執行完畢之後,才能在 MySQL 中查詢到插入的資料。即,雖然是流計算,但卻不能實時的輸出計算結果?

為什麼 Flink 無法實時寫入 MySQL?

相關代碼片段:

JDBCAppendTableSink.builder()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("jdbc:mysql://localhost/flink")
    .setUsername("root")
    .setPassword("123456")
    .setParameterTypes(
    BasicTypeInfo.INT_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO)
    .setQuery("insert into batch_size values(?,?)")
   .build()           

如何解決?

Flink 1.10 這個問題是知道一秒鐘,不知磨洋工的 Case,在初學時候非常容易遇上,那麼真的是 Flink 不能實時寫入 MySQL 嗎?當然不是,上面代碼基礎之上簡單的加上一行,就解決問題了:

...
.setBatchSize(1) //将寫入MySQL的buffer大小為1。
..           

那麼問題雖然解決了,根本原因是個啥呢?也許你看到這裡會說,這問題很明顯,就是 Flink 設計 JDBC Sink 的時候出于性能因素考慮,對寫入 buffer 做了預設值設定。

沒錯,這一點你說的很對,在 Flink 1.10 中 JDBC OutputFormat 的基類 AbstractJDBCOutputFormat 裡面和這相關的變量 DEFAULT_FLUSH_MAX_SIZE 預設值是 5000,是以在你學習測試時候由于測試資料少(少于 5000),資料一直在 buffer 中,直到資料源資料結束,作業也結束了,才将計算結果刷入 MySQL,是以沒有實時的(每條)寫入 MySQL。如下:

為什麼 Flink 無法實時寫入 MySQL?

但這裡還有個因素需要注意,那就是時間因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 預設值是 0,這個相當于沒有時間限制,一直等到 buffer 滿了或者作業結束才能觸發寫出動作。

也就是有些初學者,發現問題,即使故意 debug 時候打上斷點,不讓作業結束,但是等到花兒都謝了,資料也沒有寫入到 MySQL。

在 Flink 1.10 中 AbstractJDBCOutputFormat 有兩個實作類:

為什麼 Flink 無法實時寫入 MySQL?

分别對應了如下兩類 Sink:

為什麼 Flink 無法實時寫入 MySQL?

是以在 Flink 1.10 中不論是 AppendTableSink 和 UpsertTableSink 都會有同樣的問題。不過 UpsertTableSink 時使用者可以設定時間,而 AppendTableSink 是連時間設定的入口都木有。

那麼,是 Flink 的鍋?

就這個問題而言,我個人認為不是使用者的問題,是 Flink 1.10 代碼設計有進一步改進的空間。在 Flink 1.11 中社群的确重構了,對 JDBCOutputFormat 打了 @Deprecated。感興趣可以查閱 FLINK-17537 了解變化過程。但是在這個改進中,并沒有對 DEFAULT_FLUSH_MAX_SIZE 預設值和 DEFAULT_FLUSH_INTERVAL_MILLS 預設值做變化,社群也在積極的讨論改進方案,想參與社群貢獻或者了解最終讨論結果的可以查閱 FLINK-16497。

當然在你學習過程中使用任何 Sink 的時候,隻要沒有實時寫入,都可以找找是否有寫出 buffer 和寫出時間的限制設定。在這一點上,羅鵬程也提到了 Elasticsearch 也有類似問題,需要調用 setBulkFlushMaxActions 進行設定。

為什麼 Flink 無法實時寫入 MySQL?

大家在學習、使用 Flink 的過程中遇到的問題都可以通過 Flink 中文郵件清單進行回報,Flink 核心開發者及社群一線使用者線上答疑交流!

2 分鐘快速訂閱 Flink 中文郵件清單

Apache Flink 中文郵件清單訂閱流程:

  1. 發送任意郵件到 [email protected]
  2. 收到官方确認郵件
  3. 回複該郵件 confirm 即可訂閱

訂閱成功後将收到 Flink 官方的中文郵件清單的消息,您可以向 [email protected] 發郵件提問也可以幫助别人解答問題,動動手測試一下!

以上是對該問題解決方案及思路的分享,希望能對你有所幫助,也期待大家遇到的典型問題能及時回報至社群郵件清單。