天天看點

Flink mysql-cdc connector 源碼解析

在 Flink 1.11 引入了 CDC 機制,CDC 的全稱是 Change Data Capture,用于捕捉資料庫表的增删改查操作,是目前非常成熟的同步資料庫變更方案。Flink CDC Connectors 是 Apache Flink 的一組源連接配接器,是可以從 MySQL、PostgreSQL 資料直接讀取全量資料和增量資料的 Source Connectors.

Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志擷取變更的能力,将 changelog 轉換為 Flink SQL 認識的 RowData 資料。RowData 代表了一行的資料,在 RowData 上面會有一個中繼資料的資訊 RowKind,RowKind 裡面包括了插入(+I)、更新前(-U)、更新後(+U)、删除(-D),這樣和資料庫裡面的 binlog 概念十分類似。通過 Debezium 采集的資料,包含了舊資料(before)和新資料行(after)以及原資料資訊(source),op 的 u表示是 update 更新操作辨別符(op 字段的值c,u,d,r 分别對應 create,update,delete,reade),ts_ms 表示同步的時間戳。

下面就來深入 Flink 的源碼分析一下 CDC 的實作原理

首先 mysql-cdc 作為 Flink SQL 的一個 connector,那就肯定會對應一個 TableFactory 類,我們就從這個工廠類入手分析一下源碼的實作過程,先找到源碼裡面的 MySQLTableSourceFactory 這個類,然後來看一下它的 UML 類圖.

Flink mysql-cdc connector 源碼解析

從上圖中可以看到 MySQLTableSourceFactory 隻實作了 DynamicTableSourceFactory 這個接口,并沒有實作 DynamicTableSinkFactory 的接口,是以 mysql-cdc 是隻支援作為 source 不支援作為 sink 的,如果想要寫入 mysql 的話,可以使用JDBC 的 connector.

然後直接來看 MySQLTableSourceFactory#createDynamicTableSource 方法實作,源碼如下所示:

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    final FactoryUtil.TableFactoryHelper helper =
            FactoryUtil.createTableFactoryHelper(this, context);
    helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);

    final ReadableConfig config = helper.getOptions();
    String hostname = config.get(HOSTNAME);
    String username = config.get(USERNAME);
    String password = config.get(PASSWORD);
    String databaseName = config.get(DATABASE_NAME);
    String tableName = config.get(TABLE_NAME);
    int port = config.get(PORT);
    Integer serverId = config.getOptional(SERVER_ID).orElse(null);
    ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
    StartupOptions startupOptions = getStartupOptions(config);
    TableSchema physicalSchema =
            TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

    return new MySQLTableSource(
            physicalSchema,
            port,
            hostname,
            databaseName,
            tableName,
            username,
            password,
            serverTimeZone,
            getDebeziumProperties(context.getCatalogTable().getOptions()),
            serverId,
            startupOptions);
}
      

這個方法的主要作用就構造 MySQLTableSource 對象,先會從 DDL 中擷取 hostname,username,password 等資料庫和表的資訊,然後去建構 MySQLTableSource 對象.

接着來看一下 MySQLTableSource#getScanRuntimeProvider 這個方法,它會傳回一個用于讀取資料的運作執行個體對象

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
    TypeInformation<RowData> typeInfo =
            scanContext.createTypeInformation(physicalSchema.toRowDataType());
    DebeziumDeserializationSchema<RowData> deserializer =
            new RowDataDebeziumDeserializeSchema(
                    rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
    MySQLSource.Builder<RowData> builder =
            MySQLSource.<RowData>builder()
                    .hostname(hostname)
                    .port(port)
                    .databaseList(database)
                    .tableList(database + "." + tableName)
                    .username(username)
                    .password(password)
                    .serverTimeZone(serverTimeZone.toString())
                    .debeziumProperties(dbzProperties)
                    .startupOptions(startupOptions)
                    .deserializer(deserializer);
    Optional.ofNullable(serverId).ifPresent(builder::serverId);
    DebeziumSourceFunction<RowData> sourceFunction = builder.build();

    return SourceFunctionProvider.of(sourceFunction, false);
}
      

這個方法裡面先擷取了 rowType 和 typeInfo 資訊,然後建構了一個 DebeziumDeserializationSchema 反序列對象,這個對象的作用是把讀取到的 SourceRecord 資料類型轉換成 Flink 認識的 RowData 類型.接着來看一下 deserialize 方法.

public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
    // 擷取 op 類型
    Operation op = Envelope.operationFor(record);
    // 擷取資料
    Struct value = (Struct)record.value();
    // 擷取 schema 資訊
    Schema valueSchema = record.valueSchema();
    GenericRowData delete;
    // 根據 op 的不同類型走不同的操作
    if (op != Operation.CREATE && op != Operation.READ) {
        // delete
        if (op == Operation.DELETE) {
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            out.collect(delete);
        } else {
            // update
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.UPDATE_BEFORE);
            delete.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(delete);
            GenericRowData after = this.extractAfterRow(value, valueSchema);
            this.validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(after);
        }
    } else {
        // insert
        delete = this.extractAfterRow(value, valueSchema);
        this.validator.validate(delete, RowKind.INSERT);
        delete.setRowKind(RowKind.INSERT);
        out.collect(delete);
    }
}
      

這裡主要會判斷進來的資料類型,然後根據不同的類型走不同的操作邏輯,如果是 update 操作的話,會輸出兩條資料.最終都是會轉換成 RowData 類型輸出.

接着往下面看是 builder.build() 該方法構造了 DebeziumSourceFunction 對象,也就是說 Flink 的底層是采用 Debezium 來讀取 mysql,PostgreSQL 資料庫的變更日志的.為什麼沒有用 canal 感興趣的同學自己可以對比一下這兩個架構

然後來看一下 DebeziumSourceFunction 的 UML 類圖

Flink mysql-cdc connector 源碼解析

可以發現 DebeziumSourceF unction 不僅繼承了 RichSourceFunction 這個抽象類,而且還實作了 checkpoint 相關的接口,是以 mysql-cdc 是支援 Exactly Once 語義的.這幾個接口大家都非常熟悉了,這裡不再過多介紹.

直接來看一下核心方法 open 和 run 方法的邏輯如下

public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
    }


public void run(SourceContext<T> sourceContext) throws Exception {
    this.properties.setProperty("name", "engine");
    this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
    if (this.restoredOffsetState != null) {
        this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
    }

    this.properties.setProperty("key.converter.schemas.enable", "false");
    this.properties.setProperty("value.converter.schemas.enable", "false");
    this.properties.setProperty("include.schema.changes", "false");
    this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
    this.properties.setProperty("tombstones.on.delete", "false");
    this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
    if (this.engineInstanceName == null) {
        this.engineInstanceName = UUID.randomUUID().toString();
        FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
    }

    this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
    String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
    this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
    this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
        if (!success && error != null) {
            this.reportError(error);
        }

    }).build();
    if (this.running) {
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> {
            return this.debeziumConsumer.getFetchDelay();
        });
        metricGroup.gauge("currentEmitEventTimeLag", () -> {
            return this.debeziumConsumer.getEmitDelay();
        });
        metricGroup.gauge("sourceIdleTime", () -> {
            return this.debeziumConsumer.getIdleTime();
        });

        try {
            while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                if (this.error != null) {
                    this.running = false;
                    this.shutdownEngine();
                    ExceptionUtils.rethrow(this.error);
                }
            }
        } catch (InterruptedException var5) {
            Thread.currentThread().interrupt();
        }

    }
}
      

open 方法裡面主要是建立了一個單線程化的線程池(它隻會用唯一的工作線程來執行任務).是以 mysql-cdc 源是單線程讀取的.