天天看點

FlinkCDC系列02: Seatunnel-Flink-JDBC-Sink如何實作Update

首先閱讀Apache Seatunnel官網的關于Flink-JDBC-Sink的文檔格式

JdbcSink {
    source_table_name = fake
    driver = com.mysql.jdbc.Driver
    url = "jdbc:mysql://localhost/test"
    username = root
    query = "insert into test(name,age) values(?,?)"
    batch_size = 2
}
           

首先要吐槽一下就是官網的這個文檔幾乎什麼都沒說啊。

Seatunnel-2.1.1-flink-jdbc-sink

這樣子有個問題,就是我能不能實作類似spark-jdbc-sink中的update呢?一個隻能處理新增不能處理修改的Sink是不合格的!

直接加saveMode這個參數是不行,因為代碼裡就沒有這個參數,要知道到底支援什麼參數,必須要直接閱讀源碼才行。

2.1.1版本Flink-jdbc-sink源代碼

檢視Config.java可知,根本沒有saveMode這個參數。

進一步閱讀Sink目錄下的JdbcSink.java檔案(核心代碼)

從prepare這一段代碼可知,還有個文檔沒寫的參數叫做password(吐槽)

@Override
    public void prepare(FlinkEnvironment env) {
        driverName = config.getString(DRIVER);
        dbUrl = config.getString(URL);
        username = config.getString(USERNAME);
        query = config.getString(QUERY);
        if (config.hasPath(PASSWORD)) {
            password = config.getString(PASSWORD);
        }
        if (config.hasPath(SINK_BATCH_SIZE)) {
            batchSize = config.getInt(SINK_BATCH_SIZE);
        }
        if (config.hasPath(SINK_BATCH_INTERVAL)) {
            batchIntervalMs = config.getLong(SINK_BATCH_INTERVAL);
        }
        if (config.hasPath(SINK_BATCH_MAX_RETRIES)) {
            maxRetries = config.getInt(SINK_BATCH_MAX_RETRIES);
        }
    }
           

 接下來就是重點了,閱讀關于stream資料和batch資料分别以JDBC方式寫入的核心實作

@Override
    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
        Table table = env.getStreamTableEnvironment().fromDataStream(dataStream);
        TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();

        int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
        SinkFunction<Row> sink = org.apache.flink.connector.jdbc.JdbcSink.sink(
            query,
            (st, row) -> JdbcUtils.setRecordToStatement(st, types, row),
            JdbcExecutionOptions.builder()
                .withBatchSize(batchSize)
                .withBatchIntervalMs(batchIntervalMs)
                .withMaxRetries(maxRetries)
                .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl(dbUrl)
                .withDriverName(driverName)
                .withUsername(username)
                .withPassword(password)
                .build());

        if (config.hasPath(PARALLELISM)) {
            dataStream.addSink(sink).setParallelism(config.getInt(PARALLELISM));
        } else {
            dataStream.addSink(sink);
        }
    }

    @Override
    public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
        Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
        TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
        int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();

        JdbcOutputFormat format = JdbcOutputFormat.buildJdbcOutputFormat()
                .setDrivername(driverName)
                .setDBUrl(dbUrl)
                .setUsername(username)
                .setPassword(password)
                .setQuery(query)
                .setBatchSize(batchSize)
                .setSqlTypes(types)
                .finish();
        dataSet.output(format);
    }
           

相關的api文檔:

setRecordToStatement

Sink

簡單的概括一下,在流式資料源中,需要一個query語句和一個statement裝配器,flink程式會驗證?的數量,并且按照順序把row中資料裝配進去。

在批進行中則是直接加進setQuery中了。

那麼要如何實作Update呢?網上的答複基本上都是建議使用Table API(廢話,我要是準備自己實作就不會用Seatunnel了!)

FlinkCDC系列02: Seatunnel-Flink-JDBC-Sink如何實作Update

Flink的JDBC Connector是這麼寫的,如果定義了primary key,那麼就可以以upsert的文法進行插入,然後我找了半天也不知道怎麼在JdbcSink這個Sink代碼裡加入相關内容。

那麼,既然query是直接進裝配器的,那麼可以不可以直接通過寫一段?數量相同的upsert語句呢?

是可以的。

最終語句如下:

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
    FakeSourceStream {
      result_table_name = "fake"
      field_name = "name,age"
    }

  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
sink {
  JdbcSink {
    source_table_name = fake
    driver = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.SomeRandomIp:3306/data_for_test"
    username = "root"
    password = "Dont Try to Guess My Password"
    query = "insert into hello(name,age) values(?,?) on duplicate key update age=ifnull(VALUES (age), age)"
    batch_size = 2
  }}
           

 接上預設的FakeDataStream後實作效果如下:

FlinkCDC系列02: Seatunnel-Flink-JDBC-Sink如何實作Update

繼續閱讀