在 Flink 1.11 引入了 CDC 機制,CDC 的全稱是 Change Data Capture,用于捕捉資料庫表的增删改查操作,是目前非常成熟的同步資料庫變更方案。Flink CDC Connectors 是 Apache Flink 的一組源連接配接器,是可以從 MySQL、PostgreSQL 資料直接讀取全量資料和增量資料的 Source Connectors.
★
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志擷取變更的能力,将 changelog 轉換為 Flink SQL 認識的 RowData 資料。RowData 代表了一行的資料,在 RowData 上面會有一個中繼資料的資訊 RowKind,RowKind 裡面包括了插入(+I)、更新前(-U)、更新後(+U)、删除(-D),這樣和資料庫裡面的 binlog 概念十分類似。通過 Debezium 采集的資料,包含了舊資料(before)和新資料行(after)以及原資料資訊(source),op 的 u表示是 update 更新操作辨別符(op 字段的值c,u,d,r 分别對應 create,update,delete,reade),ts_ms 表示同步的時間戳。
”
下面就來深入 Flink 的源碼分析一下 CDC 的實作原理
首先 mysql-cdc 作為 Flink SQL 的一個 connector,那就肯定會對應一個 TableFactory 類,我們就從這個工廠類入手分析一下源碼的實作過程,先找到源碼裡面的 MySQLTableSourceFactory 這個類,然後來看一下它的 UML 類圖.
從上圖中可以看到 MySQLTableSourceFactory 隻實作了 DynamicTableSourceFactory 這個接口,并沒有實作 DynamicTableSinkFactory 的接口,是以 mysql-cdc 是隻支援作為 source 不支援作為 sink 的,如果想要寫入 mysql 的話,可以使用JDBC 的 connector.
然後直接來看 MySQLTableSourceFactory#createDynamicTableSource 方法實作,源碼如下所示:
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
Integer serverId = config.getOptional(SERVER_ID).orElse(null);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new MySQLTableSource(
physicalSchema,
port,
hostname,
databaseName,
tableName,
username,
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId,
startupOptions);
}
這個方法的主要作用就構造 MySQLTableSource 對象,先會從 DDL 中擷取 hostname,username,password 等資料庫和表的資訊,然後去建構 MySQLTableSource 對象.
接着來看一下 MySQLTableSource#getScanRuntimeProvider 這個方法,它會傳回一個用于讀取資料的運作執行個體對象
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
MySQLSource.Builder<RowData> builder =
MySQLSource.<RowData>builder()
.hostname(hostname)
.port(port)
.databaseList(database)
.tableList(database + "." + tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer);
Optional.ofNullable(serverId).ifPresent(builder::serverId);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
這個方法裡面先擷取了 rowType 和 typeInfo 資訊,然後建構了一個 DebeziumDeserializationSchema 反序列對象,這個對象的作用是把讀取到的 SourceRecord 資料類型轉換成 Flink 認識的 RowData 類型.接着來看一下 deserialize 方法.
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
// 擷取 op 類型
Operation op = Envelope.operationFor(record);
// 擷取資料
Struct value = (Struct)record.value();
// 擷取 schema 資訊
Schema valueSchema = record.valueSchema();
GenericRowData delete;
// 根據 op 的不同類型走不同的操作
if (op != Operation.CREATE && op != Operation.READ) {
// delete
if (op == Operation.DELETE) {
delete = this.extractBeforeRow(value, valueSchema);
this.validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
// update
delete = this.extractBeforeRow(value, valueSchema);
this.validator.validate(delete, RowKind.UPDATE_BEFORE);
delete.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(delete);
GenericRowData after = this.extractAfterRow(value, valueSchema);
this.validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
} else {
// insert
delete = this.extractAfterRow(value, valueSchema);
this.validator.validate(delete, RowKind.INSERT);
delete.setRowKind(RowKind.INSERT);
out.collect(delete);
}
}
這裡主要會判斷進來的資料類型,然後根據不同的類型走不同的操作邏輯,如果是 update 操作的話,會輸出兩條資料.最終都是會轉換成 RowData 類型輸出.
接着往下面看是 builder.build() 該方法構造了 DebeziumSourceFunction 對象,也就是說 Flink 的底層是采用 Debezium 來讀取 mysql,PostgreSQL 資料庫的變更日志的.為什麼沒有用 canal 感興趣的同學自己可以對比一下這兩個架構
然後來看一下 DebeziumSourceFunction 的 UML 類圖
可以發現 DebeziumSourceF unction 不僅繼承了 RichSourceFunction 這個抽象類,而且還實作了 checkpoint 相關的接口,是以 mysql-cdc 是支援 Exactly Once 語義的.這幾個接口大家都非常熟悉了,這裡不再過多介紹.
直接來看一下核心方法 open 和 run 方法的邏輯如下
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
public void run(SourceContext<T> sourceContext) throws Exception {
this.properties.setProperty("name", "engine");
this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
if (this.restoredOffsetState != null) {
this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
}
this.properties.setProperty("key.converter.schemas.enable", "false");
this.properties.setProperty("value.converter.schemas.enable", "false");
this.properties.setProperty("include.schema.changes", "false");
this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
this.properties.setProperty("tombstones.on.delete", "false");
this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
if (this.engineInstanceName == null) {
this.engineInstanceName = UUID.randomUUID().toString();
FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
}
this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
}
}).build();
if (this.running) {
this.executor.execute(this.engine);
this.debeziumStarted = true;
MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
metricGroup.gauge("currentFetchEventTimeLag", () -> {
return this.debeziumConsumer.getFetchDelay();
});
metricGroup.gauge("currentEmitEventTimeLag", () -> {
return this.debeziumConsumer.getEmitDelay();
});
metricGroup.gauge("sourceIdleTime", () -> {
return this.debeziumConsumer.getIdleTime();
});
try {
while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
if (this.error != null) {
this.running = false;
this.shutdownEngine();
ExceptionUtils.rethrow(this.error);
}
}
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
}
open 方法裡面主要是建立了一個單線程化的線程池(它隻會用唯一的工作線程來執行任務).是以 mysql-cdc 源是單線程讀取的.