在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.
★
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。
”
下面就来深入 Flink 的源码分析一下 CDC 的实现原理
首先 mysql-cdc 作为 Flink SQL 的一个 connector,那就肯定会对应一个 TableFactory 类,我们就从这个工厂类入手分析一下源码的实现过程,先找到源码里面的 MySQLTableSourceFactory 这个类,然后来看一下它的 UML 类图.
从上图中可以看到 MySQLTableSourceFactory 只实现了 DynamicTableSourceFactory 这个接口,并没有实现 DynamicTableSinkFactory 的接口,所以 mysql-cdc 是只支持作为 source 不支持作为 sink 的,如果想要写入 mysql 的话,可以使用JDBC 的 connector.
然后直接来看 MySQLTableSourceFactory#createDynamicTableSource 方法实现,源码如下所示:
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
Integer serverId = config.getOptional(SERVER_ID).orElse(null);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new MySQLTableSource(
physicalSchema,
port,
hostname,
databaseName,
tableName,
username,
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId,
startupOptions);
}
这个方法的主要作用就构造 MySQLTableSource 对象,先会从 DDL 中获取 hostname,username,password 等数据库和表的信息,然后去构建 MySQLTableSource 对象.
接着来看一下 MySQLTableSource#getScanRuntimeProvider 这个方法,它会返回一个用于读取数据的运行实例对象
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
MySQLSource.Builder<RowData> builder =
MySQLSource.<RowData>builder()
.hostname(hostname)
.port(port)
.databaseList(database)
.tableList(database + "." + tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer);
Optional.ofNullable(serverId).ifPresent(builder::serverId);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
这个方法里面先获取了 rowType 和 typeInfo 信息,然后构建了一个 DebeziumDeserializationSchema 反序列对象,这个对象的作用是把读取到的 SourceRecord 数据类型转换成 Flink 认识的 RowData 类型.接着来看一下 deserialize 方法.
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
// 获取 op 类型
Operation op = Envelope.operationFor(record);
// 获取数据
Struct value = (Struct)record.value();
// 获取 schema 信息
Schema valueSchema = record.valueSchema();
GenericRowData delete;
// 根据 op 的不同类型走不同的操作
if (op != Operation.CREATE && op != Operation.READ) {
// delete
if (op == Operation.DELETE) {
delete = this.extractBeforeRow(value, valueSchema);
this.validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
// update
delete = this.extractBeforeRow(value, valueSchema);
this.validator.validate(delete, RowKind.UPDATE_BEFORE);
delete.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(delete);
GenericRowData after = this.extractAfterRow(value, valueSchema);
this.validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
} else {
// insert
delete = this.extractAfterRow(value, valueSchema);
this.validator.validate(delete, RowKind.INSERT);
delete.setRowKind(RowKind.INSERT);
out.collect(delete);
}
}
这里主要会判断进来的数据类型,然后根据不同的类型走不同的操作逻辑,如果是 update 操作的话,会输出两条数据.最终都是会转换成 RowData 类型输出.
接着往下面看是 builder.build() 该方法构造了 DebeziumSourceFunction 对象,也就是说 Flink 的底层是采用 Debezium 来读取 mysql,PostgreSQL 数据库的变更日志的.为什么没有用 canal 感兴趣的同学自己可以对比一下这两个框架
然后来看一下 DebeziumSourceFunction 的 UML 类图
可以发现 DebeziumSourceF unction 不仅继承了 RichSourceFunction 这个抽象类,而且还实现了 checkpoint 相关的接口,所以 mysql-cdc 是支持 Exactly Once 语义的.这几个接口大家都非常熟悉了,这里不再过多介绍.
直接来看一下核心方法 open 和 run 方法的逻辑如下
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
public void run(SourceContext<T> sourceContext) throws Exception {
this.properties.setProperty("name", "engine");
this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
if (this.restoredOffsetState != null) {
this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
}
this.properties.setProperty("key.converter.schemas.enable", "false");
this.properties.setProperty("value.converter.schemas.enable", "false");
this.properties.setProperty("include.schema.changes", "false");
this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
this.properties.setProperty("tombstones.on.delete", "false");
this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
if (this.engineInstanceName == null) {
this.engineInstanceName = UUID.randomUUID().toString();
FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
}
this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
}
}).build();
if (this.running) {
this.executor.execute(this.engine);
this.debeziumStarted = true;
MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
metricGroup.gauge("currentFetchEventTimeLag", () -> {
return this.debeziumConsumer.getFetchDelay();
});
metricGroup.gauge("currentEmitEventTimeLag", () -> {
return this.debeziumConsumer.getEmitDelay();
});
metricGroup.gauge("sourceIdleTime", () -> {
return this.debeziumConsumer.getIdleTime();
});
try {
while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
if (this.error != null) {
this.running = false;
this.shutdownEngine();
ExceptionUtils.rethrow(this.error);
}
}
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
}
open 方法里面主要是创建了一个单线程化的线程池(它只会用唯一的工作线程来执行任务).所以 mysql-cdc 源是单线程读取的.