天天看點

Flink kafka source & sink 源碼解析

Flink kafka source & sink 源碼解析

原創 吳鵬 Flink 中文社群 4天前

摘要:本文基于 Flink 1.9.0 和 Kafka 2.3 版本,對 Flink Kafka source 和 sink 端的源碼進行解析,主要内容分為以下兩部分:

1.Flink-kafka-source 源碼解析

  • 流程概述
  • 非 checkpoint 模式 offset 的送出
  • checkpoint 模式下 offset 的送出
  • 指定 offset 消費

2.Flink-kafka-sink 源碼解析

  • 初始化
  • Task運作
  • 小結

一般在 Flink 中建立 kafka source 的代碼如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema為自定義的資料字段解析類
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)           

而 Kafka 的 KafkaConsumer API 中消費某個 topic 使用的是 poll 方法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));           

下面将分析這兩個流程是如何銜接起來的。

初始化執行 env.addSource 的時候會建立 StreamSource 對象,即 final StreamSource sourceOperator = new StreamSource<>(function);這裡的function 就是傳入的 FlinkKafkaConsumer 對象,StreamSource 構造函數中将這個對象傳給父類 AbstractUdfStreamOperator 的 userFunction 變量,源碼如下:

■ StreamSource.java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}           

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}           

task 啟動後會調用到 SourceStreamTask 中的 performDefaultAction() 方法,這裡面會啟動一個線程 sourceThread.start();,部分源碼如下:

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}           

在 LegacySourceFunctionThread 的 run 方法中,通過調用 headOperator.run 方法,最終調用了 StreamSource 中的 run 方法,部分源碼如下:

public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector,
                final OperatorChain<?, ?> operatorChain) throws Exception {

  //省略部分代碼
  this.ctx = StreamSourceContexts.getSourceContext(
    timeCharacteristic,
    getProcessingTimeService(),
    lockingObject,
    streamStatusMaintainer,
    collector,
    watermarkInterval,
    -1);

  try {
    userFunction.run(ctx);
    //省略部分代碼
  } finally {
    // make sure that the context is closed in any case
    ctx.close();
    if (latencyEmitter != null) {
      latencyEmitter.close();
    }
  }
}           

這裡最重要的就是 userFunction.run(ctx);,這個 userFunction 就是在上面初始化的時候傳入的 FlinkKafkaConsumer 對象,也就是說這裡實際調用了 FlinkKafkaConsumer 中的 run 方法,而具體的方法實作在其父類 FlinkKafkaConsumerBase中,至此,進入了真正的 kafka 消費階段。

Kafka消費階段

在 FlinkKafkaConsumerBase#run 中建立了一個 KafkaFetcher 對象,并最終調用了 kafkaFetcher.runFetchLoop(),這個方法的代碼片段如下:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();
    
    //省略部分代碼
}           

可以看到實際啟動了一個 KafkaConsumerThread 線程。進入到 KafkaConsumerThread#run 中,下面隻是列出了這個方法的部分源碼,完整代碼請參考 KafkaConsumerThread.java。

@Override
public void run() {
  // early exit check
  if (!running) {
    return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}           

至此,終于走到了真正從 kafka 拿資料的代碼,即 records = consumer.poll(pollTimeout);。因為 KafkaConsumer 不是線程安全的,是以每個線程都需要生成獨立的 KafkaConsumer 對象,即 this.consumer = getConsumer(kafkaProperties);。

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}           
小結:本節隻是介紹了 Flink 消費 kafka 資料的關鍵流程,下面會更詳細的介紹在AT_LEAST_ONCE和EXACTLY_ONCE 不同場景下 FlinkKafkaConsumer 管理 offset 的流程。

消費 kafka topic 最為重要的部分就是對 offset 的管理,對于 kafka 送出 offset 的機制,可以參考 kafka 官方網。

而在 flink kafka source 中 offset 的送出模式有3種:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}           

初始化 offsetCommitMode

在 FlinkKafkaConsumerBase#open 方法中初始化 offsetCommitMode:

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());           
  • 方法 getIsAutoCommitEnabled() 的實作如下:
protected boolean getIsAutoCommitEnabled() {
   return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
      PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}           
  • 也就是說隻有 enable.auto.commit=true 并且 auto.commit.interval.ms>0 這個方法才會傳回 true
  • 變量 enableCommitOnCheckpoints 預設是 true,可以調用 setCommitOffsetsOnCheckpoints 改變這個值
  • 當代碼中調用了 env.enableCheckpointing 方法,isCheckpointingEnabled 才會傳回 true

通過下面的代碼傳回真正的送出模式:

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}           

暫時不考慮 checkpoint 的場景,是以隻考慮 (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;。

也就是如果用戶端設定了 enable.auto.commit=true 那麼就是 KAFKA_PERIODIC,否則就是 KAFKA_DISABLED。

offset 的送出

■ 自動送出

這種方式完全依靠 kafka 自身的特性進行送出,如下方式指定參數即可:

Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)           

■ 非自動送出

通過上面的分析,如果 enable.auto.commit=false,那麼 offsetCommitMode 就是 DISABLED 。

kafka 官方文檔中,提到當 enable.auto.commit=false 時候需要手動送出 offset,也就是需要調用 consumer.commitSync(); 方法送出。

但是在 flink 中,非 checkpoint 模式下,不會調用 consumer.commitSync();, 一旦關閉自動送出,意味着 kafka 不知道目前的 consumer group 每次消費到了哪。

可以從兩方面證明這個問題:

  • 源碼

    KafkaConsumerThread#run 方法中是有 consumer.commitSync();,但是隻有當 commitOffsetsAndCallback != null 的時候才會調用。隻有開啟了checkpoint 功能才會不為 null,這個變量會在後續的文章中詳細分析。

  • 測試

    可以通過消費 __consumer_offsets 觀察是否有 offset 的送出

重新開機程式,還是會重複消費之前消費過的資料

小結:本節介紹了在非 checkpoint 模式下,Flink kafka source 送出 offset 的方式,下文會重點介紹 checkpoint 模式下送出 offset 的流程。

上面介紹了在沒有開啟 checkpoint 的時候,offset 的送出方式,下面将重點介紹開啟 checkpoint 後,Flink kafka consumer 送出 offset 的方式。

通過上文可以知道,當調用了 env.enableCheckpointing 方法後 offsetCommitMode 的值就是 ON_CHECKPOINTS,而且會通過下面方法強制關閉 kafka 自動送出功能,這個值很重要,後續很多地方都是根據這個值去判斷如何操作的。

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   }
}           

儲存 offset

在做 checkpoint 的時候會調用 FlinkKafkaConsumerBase#snapshotState 方法,其中 pendingOffsetsToCommit 會儲存要送出的 offset。

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // the map cannot be asynchronously updated, because only one checkpoint call can happen
   // on this function at a time: either snapshotState() or notifyCheckpointComplete()
   pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}           

同時,下面的變量會作為 checkpoint 的一部分儲存下來,以便恢複時使用。

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

在 snapshotState 方法中會同時儲存 offset:

for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}           

送出 offset

在 checkpoint 完成以後,task 會調用 notifyCheckpointComplete 方法,裡面判斷 offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的時候,調用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); 方法,最終會将要送出的 offset 通過 KafkaFetcher#doCommitInternalOffsetsToKafka 方法中的 consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); 儲存到 KafkaConsumerThread.java 中的 nextOffsetsToCommit 成員變量裡面。

這樣就會保證當有需要送出的 offset 的時候,下面代碼會執行 consumer.commitAsync,進而完成了手動送出 offset 到 kafka。

final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

if (commitOffsetsAndCallback != null) {
  log.debug("Sending async offset commit request to Kafka broker");

  // also record that a commit is already in progress
  // the order here matters! first set the flag, then send the commit command.
  commitInProgress = true;
  consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}           
小結:本節介紹了在 checkpoint 模式下,Flink kafka source 送出 offset 的方式,後續會介紹 consumer 讀取 offset 的流程。

消費模式

在 Flink 的 kafka source 中有以下5種模式指定 offset 消費:

public enum StartupMode {

   /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
   GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

   /** Start from the earliest offset possible. */
   EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

   /** Start from the latest offset. */
   LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

   /**
    * Start from user-supplied timestamp for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   TIMESTAMP(Long.MIN_VALUE),

   /**
    * Start from user-supplied specific offsets for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   SPECIFIC_OFFSETS(Long.MIN_VALUE);
}           

預設為 GROUP_OFFSETS,表示根據上一次 group id 送出的 offset 位置開始消費。每個枚舉的值其實是一個 long 型的負數,根據不同的模式,在每個 partition 初始化的時候會預設将 offset 設定為這個負數。其他的方式和 kafka 本身的語義類似,就不在贅述。

指定 offset

此處隻讨論預設的 GROUP_OFFSETS 方式,下文所有分析都是基于這種模式。但是還是需要區分是否開啟了 checkpoint。在開始分析之前需要對幾個重要的變量進行說明:

  • subscribedPartitionsToStartOffsets
    • 所屬類:FlinkKafkaConsumerBase.java
    • 定義:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt           

說明:儲存訂閱 topic 的所有 partition 以及初始消費的 offset。

  • subscribedPartitionStates
    • 所屬類:AbstractFetcher.java
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar           

說明:儲存了所有訂閱的 partition 的 offset 等詳細資訊,例如:

/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;           

每次消費完資料之後都會更新這些值,這個變量非常的重要,在做 checkpoint 的時候,儲存的 offset 等資訊都是來自于這個變量。這個變量的初始化如下:

// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
  seedPartitionsWithInitialOffsets,
  timestampWatermarkMode,
  watermarksPeriodic,
  watermarksPunctuated,
  userCodeClassLoader);           

消費之後更新相應的 offset 主要在 KafkaFetcher#runFetchLoop

方法 while 循環中調用 emitRecord(value, partition, record.

offset(), record);。

  • restoredState
/**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;           

說明:如果指定了恢複的 checkpoint 路徑,啟動時候将會讀取這個變量裡面的内容擷取起始 offset,而不再是使用 StartupMode 中的枚舉值作為初始的 offset。

  • unionOffsetStates
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;           

說明:儲存了 checkpoint 要持久化存儲的内容,例如每個 partition 已經消費的 offset 等資訊

■ 非 checkpoint 模式

在沒有開啟 checkpoint 的時候,消費 kafka 中的資料,其實就是完全依靠 kafka 自身的機制進行消費。

■ checkpoint 模式

開啟 checkpoint 模式以後,會将 offset 等資訊持久化存儲以便恢複時使用。但是作業重新開機以後如果由于某種原因讀不到 checkpoint 的結果,例如 checkpoint 檔案丢失或者沒有指定恢複路徑等。

  • 第一種情況,如果讀取不到 checkpoint 的内容

subscribedPartitionsToStartOffsets 會初始化所有 partition 的起始 offset為 -915623761773L 這個值就表示了目前為 GROUP_OFFSETS 模式。

default:
   for (KafkaTopicPartition seedPartition : allPartitions) {
      subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
   }           

第一次消費之前,指定讀取 offset 位置的關鍵方法是 KafkaConsumerThread#reassignPartitions 代碼片段如下:

for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
  if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
    consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
    consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
    // the KafkaConsumer by default will automatically seek the consumer position
    // to the committed group offset, so we do not need to do it.
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else {
    consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
  }
}           

因為是 GROUP_OFFSET 模式 ,是以會調用 newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); 需要說明的是,在 state 裡面需要存儲的是成功消費的最後一條資料的 offset,但是通過 position 這個方法傳回的是下一次應該消費的起始 offset,是以需要減1。這裡更新這個的目的是為了 checkpoint 的時候可以正确的拿到 offset。

這種情況由于讀取不到上次 checkpoint 的結果,是以依舊是依靠 kafka 自身的機制,即根據__consumer_offsets 記錄的内容消費。

  • 第二種情況,checkpoint 可以讀取到

這種情況下, subscribedPartitionsToStartOffsets 初始的 offset 就是具體從checkpoint 中恢複的内容,這樣 KafkaConsumerThread#reassignPartitions 實際走的分支就是:

consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);           

這裡加1的原理同上,state 儲存的是最後一次成功消費資料的 offset,是以加1才是現在需要開始消費的 offset。

小結:本節介紹了程式啟動時,如何确定從哪個 offset 開始消費,下文會繼續分析 flink kafka sink 的相關源碼。

通常添加一個 kafka sink 的代碼如下:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");           

初始化執行 env.addSink 的時候會建立 StreamSink 對象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));這裡的 sinkFunction 就是傳入的 FlinkKafkaProducer 對象,StreamSink 構造函數中将這個對象傳給父類 AbstractUdfStreamOperator 的 userFunction 變量,源碼如下:

■ StreamSink.java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}           

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}           

Task 運作

StreamSink 會調用下面的方法發送資料:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}
``

也就是實際調用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的構造函數中需要指 FlinkKafkaProducer.Semantic,即:
           

public enum Semantic {

EXACTLY_ONCE,

AT_LEAST_ONCE,

NONE

}

下面就基于3種語義分别說一下總體的向 kafka 發送資料的流程。

**■ Semantic.NONE**

這種方式不會做任何額外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 裡面發送資料之後,Flink 不會再考慮 kafka 是否已經正确的收到資料。
           

transaction.producer.send(record, callback);

**■ Semantic.AT_LEAST_ONCE**

這種語義下,除了會走上面說到的發送資料的流程外,如果開啟了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中會首先執行父類的 snapshotState方法,裡面最終會執行 FlinkKafkaProducer#preCommit。
           

@Override

protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {

switch (semantic) {

case EXACTLY_ONCE:
  case AT_LEAST_ONCE:
     flush(transaction);
     break;
  case NONE:
     break;
  default:
     throw new UnsupportedOperationException("Not implemented semantic");           

checkErroneous();

AT_LEAST_ONCE 會執行了 flush 方法,裡面執行了:
           

transaction.producer.flush();

就是将 send 的資料立即發送給 kafka 服務端,詳細含義可以參考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

> flush()  
> Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.  


**■ Semantic.EXACTLY_ONCE**

EXACTLY_ONCE 語義也會執行 send 和 flush 方法,但是同時會開啟 kafka producer 的事務機制。FlinkKafkaProducer 中 beginTransaction 的源碼如下,可以看到隻有是 EXACTLY_ONCE 模式才會真正開始一個事務。
           

protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {

case EXACTLY_ONCE:
     FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
     producer.beginTransaction();
     return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
  case AT_LEAST_ONCE:
  case NONE:
     // Do not create new producer on each beginTransaction() if it is not necessary
     final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
     if (currentTransaction != null && currentTransaction.producer != null) {
        return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
     }
     return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
  default:
     throw new UnsupportedOperationException("Not implemented semantic");           
和 AT_LEAST_ONCE 另一個不同的地方在于 checkpoint 的時候,會将事務相關資訊儲存到變量 nextTransactionalIdHintState 中,這個變量存儲的資訊會作為 checkpoint 中的一部分進行持久化。
           

if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {

checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");

long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that

// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this

// scaling up.

if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {

nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;           

nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(

getRuntimeContext().getNumberOfParallelSubtasks(),
  nextFreeTransactionalId));           
> **小結:**本節介紹了 Flink Kafka Producer 的基本實作原理,後續會詳細介紹 Flink 在結合 kafka 的時候如何做到端到端的 Exactly Once 語義的。  


**作者介紹:**

吳鵬,亞信科技資深工程師,Apache Flink Contributor。先後就職于中興,IBM,華為。目前在亞信科技負責實時流處理引擎産品的研發。