Flink kafka source & sink 源碼解析
原創 吳鵬 Flink 中文社群 4天前
摘要:本文基于 Flink 1.9.0 和 Kafka 2.3 版本,對 Flink Kafka source 和 sink 端的源碼進行解析,主要内容分為以下兩部分:
1.Flink-kafka-source 源碼解析
- 流程概述
- 非 checkpoint 模式 offset 的送出
- checkpoint 模式下 offset 的送出
- 指定 offset 消費
2.Flink-kafka-sink 源碼解析
- 初始化
- Task運作
- 小結
一般在 Flink 中建立 kafka source 的代碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema為自定義的資料字段解析類
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
而 Kafka 的 KafkaConsumer API 中消費某個 topic 使用的是 poll 方法如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));
下面将分析這兩個流程是如何銜接起來的。
初始化執行 env.addSource 的時候會建立 StreamSource 對象,即 final StreamSource sourceOperator = new StreamSource<>(function);這裡的function 就是傳入的 FlinkKafkaConsumer 對象,StreamSource 構造函數中将這個對象傳給父類 AbstractUdfStreamOperator 的 userFunction 變量,源碼如下:
■ StreamSource.java
public StreamSource(SRC sourceFunction) {
super(sourceFunction);
this.chainingStrategy = ChainingStrategy.HEAD;
}
■ AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
task 啟動後會調用到 SourceStreamTask 中的 performDefaultAction() 方法,這裡面會啟動一個線程 sourceThread.start();,部分源碼如下:
private final LegacySourceFunctionThread sourceThread;
@Override
protected void performDefaultAction(ActionContext context) throws Exception {
sourceThread.start();
}
在 LegacySourceFunctionThread 的 run 方法中,通過調用 headOperator.run 方法,最終調用了 StreamSource 中的 run 方法,部分源碼如下:
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {
//省略部分代碼
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
//省略部分代碼
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
這裡最重要的就是 userFunction.run(ctx);,這個 userFunction 就是在上面初始化的時候傳入的 FlinkKafkaConsumer 對象,也就是說這裡實際調用了 FlinkKafkaConsumer 中的 run 方法,而具體的方法實作在其父類 FlinkKafkaConsumerBase中,至此,進入了真正的 kafka 消費階段。
Kafka消費階段
在 FlinkKafkaConsumerBase#run 中建立了一個 KafkaFetcher 對象,并最終調用了 kafkaFetcher.runFetchLoop(),這個方法的代碼片段如下:
/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;
@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;
// kick off the actual Kafka consumer
consumerThread.start();
//省略部分代碼
}
可以看到實際啟動了一個 KafkaConsumerThread 線程。進入到 KafkaConsumerThread#run 中,下面隻是列出了這個方法的部分源碼,完整代碼請參考 KafkaConsumerThread.java。
@Override
public void run() {
// early exit check
if (!running) {
return;
}
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer(kafkaProperties);
} catch (Throwable t) {
handover.reportError(t);
return;
}
try {
// main fetch loop
while (running) {
try {
if (records == null) {
try {
records = consumer.poll(pollTimeout);
} catch (WakeupException we) {
continue;
}
}
}
// end main fetch loop
}
} catch (Throwable t) {
handover.reportError(t);
} finally {
handover.close();
try {
consumer.close();
} catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
}
至此,終于走到了真正從 kafka 拿資料的代碼,即 records = consumer.poll(pollTimeout);。因為 KafkaConsumer 不是線程安全的,是以每個線程都需要生成獨立的 KafkaConsumer 對象,即 this.consumer = getConsumer(kafkaProperties);。
KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
return new KafkaConsumer<>(kafkaProperties);
}
小結:本節隻是介紹了 Flink 消費 kafka 資料的關鍵流程,下面會更詳細的介紹在AT_LEAST_ONCE和EXACTLY_ONCE 不同場景下 FlinkKafkaConsumer 管理 offset 的流程。
消費 kafka topic 最為重要的部分就是對 offset 的管理,對于 kafka 送出 offset 的機制,可以參考 kafka 官方網。
而在 flink kafka source 中 offset 的送出模式有3種:
public enum OffsetCommitMode {
/** Completely disable offset committing. */
DISABLED,
/** Commit offsets back to Kafka only when checkpoints are completed. */
ON_CHECKPOINTS,
/** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
KAFKA_PERIODIC;
}
初始化 offsetCommitMode
在 FlinkKafkaConsumerBase#open 方法中初始化 offsetCommitMode:
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
- 方法 getIsAutoCommitEnabled() 的實作如下:
protected boolean getIsAutoCommitEnabled() {
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
- 也就是說隻有 enable.auto.commit=true 并且 auto.commit.interval.ms>0 這個方法才會傳回 true
- 變量 enableCommitOnCheckpoints 預設是 true,可以調用 setCommitOffsetsOnCheckpoints 改變這個值
- 當代碼中調用了 env.enableCheckpointing 方法,isCheckpointingEnabled 才會傳回 true
通過下面的代碼傳回真正的送出模式:
/**
* Determine the offset commit mode using several configuration values.
*
* @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
*
* @return the offset commit mode to use, based on the configuration values.
*/
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
暫時不考慮 checkpoint 的場景,是以隻考慮 (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;。
也就是如果用戶端設定了 enable.auto.commit=true 那麼就是 KAFKA_PERIODIC,否則就是 KAFKA_DISABLED。
offset 的送出
■ 自動送出
這種方式完全依靠 kafka 自身的特性進行送出,如下方式指定參數即可:
Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
■ 非自動送出
通過上面的分析,如果 enable.auto.commit=false,那麼 offsetCommitMode 就是 DISABLED 。
kafka 官方文檔中,提到當 enable.auto.commit=false 時候需要手動送出 offset,也就是需要調用 consumer.commitSync(); 方法送出。
但是在 flink 中,非 checkpoint 模式下,不會調用 consumer.commitSync();, 一旦關閉自動送出,意味着 kafka 不知道目前的 consumer group 每次消費到了哪。
可以從兩方面證明這個問題:
-
源碼
KafkaConsumerThread#run 方法中是有 consumer.commitSync();,但是隻有當 commitOffsetsAndCallback != null 的時候才會調用。隻有開啟了checkpoint 功能才會不為 null,這個變量會在後續的文章中詳細分析。
-
測試
可以通過消費 __consumer_offsets 觀察是否有 offset 的送出
重新開機程式,還是會重複消費之前消費過的資料
小結:本節介紹了在非 checkpoint 模式下,Flink kafka source 送出 offset 的方式,下文會重點介紹 checkpoint 模式下送出 offset 的流程。
上面介紹了在沒有開啟 checkpoint 的時候,offset 的送出方式,下面将重點介紹開啟 checkpoint 後,Flink kafka consumer 送出 offset 的方式。
通過上文可以知道,當調用了 env.enableCheckpointing 方法後 offsetCommitMode 的值就是 ON_CHECKPOINTS,而且會通過下面方法強制關閉 kafka 自動送出功能,這個值很重要,後續很多地方都是根據這個值去判斷如何操作的。
/**
* Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
* This overwrites whatever setting the user configured in the properties.
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
儲存 offset
在做 checkpoint 的時候會調用 FlinkKafkaConsumerBase#snapshotState 方法,其中 pendingOffsetsToCommit 會儲存要送出的 offset。
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
同時,下面的變量會作為 checkpoint 的一部分儲存下來,以便恢複時使用。
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
在 snapshotState 方法中會同時儲存 offset:
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
送出 offset
在 checkpoint 完成以後,task 會調用 notifyCheckpointComplete 方法,裡面判斷 offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的時候,調用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); 方法,最終會将要送出的 offset 通過 KafkaFetcher#doCommitInternalOffsetsToKafka 方法中的 consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); 儲存到 KafkaConsumerThread.java 中的 nextOffsetsToCommit 成員變量裡面。
這樣就會保證當有需要送出的 offset 的時候,下面代碼會執行 consumer.commitAsync,進而完成了手動送出 offset 到 kafka。
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
小結:本節介紹了在 checkpoint 模式下,Flink kafka source 送出 offset 的方式,後續會介紹 consumer 讀取 offset 的流程。
消費模式
在 Flink 的 kafka source 中有以下5種模式指定 offset 消費:
public enum StartupMode {
/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** Start from the earliest offset possible. */
EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset. */
LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
/**
* Start from user-supplied timestamp for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
TIMESTAMP(Long.MIN_VALUE),
/**
* Start from user-supplied specific offsets for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
SPECIFIC_OFFSETS(Long.MIN_VALUE);
}
預設為 GROUP_OFFSETS,表示根據上一次 group id 送出的 offset 位置開始消費。每個枚舉的值其實是一個 long 型的負數,根據不同的模式,在每個 partition 初始化的時候會預設将 offset 設定為這個負數。其他的方式和 kafka 本身的語義類似,就不在贅述。
指定 offset
此處隻讨論預設的 GROUP_OFFSETS 方式,下文所有分析都是基于這種模式。但是還是需要區分是否開啟了 checkpoint。在開始分析之前需要對幾個重要的變量進行說明:
- subscribedPartitionsToStartOffsets
- 所屬類:FlinkKafkaConsumerBase.java
- 定義:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt
說明:儲存訂閱 topic 的所有 partition 以及初始消費的 offset。
- subscribedPartitionStates
- 所屬類:AbstractFetcher.java
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar
說明:儲存了所有訂閱的 partition 的 offset 等詳細資訊,例如:
/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;
每次消費完資料之後都會更新這些值,這個變量非常的重要,在做 checkpoint 的時候,儲存的 offset 等資訊都是來自于這個變量。這個變量的初始化如下:
// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
userCodeClassLoader);
消費之後更新相應的 offset 主要在 KafkaFetcher#runFetchLoop
方法 while 循環中調用 emitRecord(value, partition, record.
offset(), record);。
- restoredState
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
* <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.
*/
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
說明:如果指定了恢複的 checkpoint 路徑,啟動時候将會讀取這個變量裡面的内容擷取起始 offset,而不再是使用 StartupMode 中的枚舉值作為初始的 offset。
- unionOffsetStates
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
說明:儲存了 checkpoint 要持久化存儲的内容,例如每個 partition 已經消費的 offset 等資訊
■ 非 checkpoint 模式
在沒有開啟 checkpoint 的時候,消費 kafka 中的資料,其實就是完全依靠 kafka 自身的機制進行消費。
■ checkpoint 模式
開啟 checkpoint 模式以後,會将 offset 等資訊持久化存儲以便恢複時使用。但是作業重新開機以後如果由于某種原因讀不到 checkpoint 的結果,例如 checkpoint 檔案丢失或者沒有指定恢複路徑等。
- 第一種情況,如果讀取不到 checkpoint 的内容
subscribedPartitionsToStartOffsets 會初始化所有 partition 的起始 offset為 -915623761773L 這個值就表示了目前為 GROUP_OFFSETS 模式。
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
第一次消費之前,指定讀取 offset 位置的關鍵方法是 KafkaConsumerThread#reassignPartitions 代碼片段如下:
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
}
}
因為是 GROUP_OFFSET 模式 ,是以會調用 newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); 需要說明的是,在 state 裡面需要存儲的是成功消費的最後一條資料的 offset,但是通過 position 這個方法傳回的是下一次應該消費的起始 offset,是以需要減1。這裡更新這個的目的是為了 checkpoint 的時候可以正确的拿到 offset。
這種情況由于讀取不到上次 checkpoint 的結果,是以依舊是依靠 kafka 自身的機制,即根據__consumer_offsets 記錄的内容消費。
- 第二種情況,checkpoint 可以讀取到
這種情況下, subscribedPartitionsToStartOffsets 初始的 offset 就是具體從checkpoint 中恢複的内容,這樣 KafkaConsumerThread#reassignPartitions 實際走的分支就是:
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
這裡加1的原理同上,state 儲存的是最後一次成功消費資料的 offset,是以加1才是現在需要開始消費的 offset。
小結:本節介紹了程式啟動時,如何确定從哪個 offset 開始消費,下文會繼續分析 flink kafka sink 的相關源碼。
通常添加一個 kafka sink 的代碼如下:
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
初始化執行 env.addSink 的時候會建立 StreamSink 對象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));這裡的 sinkFunction 就是傳入的 FlinkKafkaProducer 對象,StreamSink 構造函數中将這個對象傳給父類 AbstractUdfStreamOperator 的 userFunction 變量,源碼如下:
■ StreamSink.java
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
}
■ AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
Task 運作
StreamSink 會調用下面的方法發送資料:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
``
也就是實際調用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的構造函數中需要指 FlinkKafkaProducer.Semantic,即:
public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}
下面就基于3種語義分别說一下總體的向 kafka 發送資料的流程。
**■ Semantic.NONE**
這種方式不會做任何額外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 裡面發送資料之後,Flink 不會再考慮 kafka 是否已經正确的收到資料。
transaction.producer.send(record, callback);
**■ Semantic.AT_LEAST_ONCE**
這種語義下,除了會走上面說到的發送資料的流程外,如果開啟了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中會首先執行父類的 snapshotState方法,裡面最終會執行 FlinkKafkaProducer#preCommit。
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
checkErroneous();
AT_LEAST_ONCE 會執行了 flush 方法,裡面執行了:
transaction.producer.flush();
就是将 send 的資料立即發送給 kafka 服務端,詳細含義可以參考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> flush()
> Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
**■ Semantic.EXACTLY_ONCE**
EXACTLY_ONCE 語義也會執行 send 和 flush 方法,但是同時會開啟 kafka producer 的事務機制。FlinkKafkaProducer 中 beginTransaction 的源碼如下,可以看到隻有是 EXACTLY_ONCE 模式才會真正開始一個事務。
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
和 AT_LEAST_ONCE 另一個不同的地方在于 checkpoint 的時候,會将事務相關資訊儲存到變量 nextTransactionalIdHintState 中,這個變量存儲的資訊會作為 checkpoint 中的一部分進行持久化。
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
> **小結:**本節介紹了 Flink Kafka Producer 的基本實作原理,後續會詳細介紹 Flink 在結合 kafka 的時候如何做到端到端的 Exactly Once 語義的。
**作者介紹:**
吳鵬,亞信科技資深工程師,Apache Flink Contributor。先後就職于中興,IBM,華為。目前在亞信科技負責實時流處理引擎産品的研發。