天天看点

Flink实战(八) - Streaming Connectors 编程(下)参考

3.6 Kafka生产者

Flink的Kafka Producer被称为FlinkKafkaProducer011(或010 对于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,对于Kafka>=1.0.0的版本来说)。

它允许将记录流写入一个或多个Kafka主题。

自应用

Pro

Flink实战(八) - Streaming Connectors 编程(下)参考

确保启动端口

Flink实战(八) - Streaming Connectors 编程(下)参考

Pro端生产消息

Flink实战(八) - Streaming Connectors 编程(下)参考

消费端接收

Flink实战(八) - Streaming Connectors 编程(下)参考

Example

  • Java
  • Flink实战(八) - Streaming Connectors 编程(下)参考
  • Scala
Flink实战(八) - Streaming Connectors 编程(下)参考

上面的示例演示了创建Flink Kafka Producer以将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其他构造函数变体允许提供以下内容:

提供自定义属性

生产者允许为内部的KafkaProducer提供自定义属性配置。

自定义分区程序

将记录分配给特定分区,可以为FlinkKafkaPartitioner构造函数提供实现。将为流中的每个记录调用此分区程序,以确定应将记录发送到的目标主题的确切分区。

高级序列化模式

与消费者类似,生产者还允许使用调用的高级序列化模式KeyedSerializationSchema,该模式允许单独序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。

3.8 Kafka消费者开始位置配置

Flink Kafka Consumer允许配置如何确定Kafka分区的起始位置。

Flink实战(八) - Streaming Connectors 编程(下)参考
Flink实战(八) - Streaming Connectors 编程(下)参考

Flink Kafka Consumer的所有版本都具有上述明确的起始位置配置方法。

setStartFromGroupOffsets(默认行为)

从group.idKafka代理(或Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。如果找不到分区的偏移量,auto.offset.reset将使用属性中的设置。

setStartFromEarliest()/ setStartFromLatest()

从最早/最新记录开始。在这些模式下,Kafka中的承诺偏移将被忽略,不会用作起始位置。

setStartFromTimestamp(long)

从指定的时间戳开始。对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。

还可以指定消费者应从每个分区开始的确切偏移量:

Flink实战(八) - Streaming Connectors 编程(下)参考
Flink实战(八) - Streaming Connectors 编程(下)参考

上面的示例将使用者配置为从主题的分区0,1和2的指定偏移量开始myTopic。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为(即)。

请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。

3.9 Kafka生产者和容错

Kafka 0.8

在0.9之前,Kafka没有提供任何机制来保证至少一次或恰好一次的语义。

Kafka 0.9和0.10

启用Flink的检查点时,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次传输保证。

除了开启Flink的检查点,还应该配置setter方法:

setLogFailuresOnly(boolean)

默认为false。启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们。这大体上就是计数已成功的记录,即使它从未写入目标Kafka主题。这必须设为false对于确保 至少一次

setFlushOnCheckpoint(boolean)

默认为true。启用此函数后,Flink的检查点将在检查点成功之前等待检查点时的任何动态记录被Kafka确认。这可确保检查点之前的所有记录都已写入Kafka。必须开启,对于确保 至少一次

总之,默认情况下,Kafka生成器对版本0.9和0.10具有至少一次保证,即

setLogFailureOnly设置为false和setFlushOnCheckpoint设置为true。

默认情况下,重试次数设置为“0”。这意味着当setLogFailuresOnly设置为时false,生产者会立即失败,包括Leader更改。

默认情况下,该值设置为“0”,以避免重试导致目标主题中出现重复消息。对于经常更改代理的大多数生产环境,建议将重试次数设置为更高的值。

Kafka目前没有生产者事务,因此Flink在Kafka主题里无法保证恰好一次交付

Kafka >= 0.11

启用Flink的检查点后,FlinkKafkaProducer011

对于Kafka >= 1.0.0版本是FlinkKafkaProduce

可以提供准确的一次交付保证。

除了启用Flink的检查点,还可以通过将适当的语义参数传递给FlinkKafkaProducer011,选择三种不同的算子操作模式

Semantic.NONE

Flink实战(八) - Streaming Connectors 编程(下)参考
  • Flink啥都不保证。生成的记录可能会丢失,也可能会重复。
  • Semantic.AT_LEAST_ONCE(默认设置)
  • Flink实战(八) - Streaming Connectors 编程(下)参考
  • 类似于setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。这可以保证不会丢失任何记录(尽管它们可以重复)。
  • Semantic.EXACTLY_ONCE
Flink实战(八) - Streaming Connectors 编程(下)参考

使用Kafka事务提供恰好一次的语义。每当您使用事务写入Kafka时,不要忘记为任何从Kafka消费记录的应用程序设置所需的isolation.level(read_committed 或read_uncommitted- 后者为默认值)。

注意事项

Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。

Kafka broker默认 transaction.max.timeout.ms 设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。

FlinkKafkaProducer011默认情况下,将transaction.timeout.msproducer config中的属性设置为1小时,因此transaction.max.timeout.ms在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 该属性。

在read_committed模式中KafkaConsumer,任何未完成的事务(既不中止也不完成)将阻止来自给定Kafka主题的所有读取超过任何未完成的事务。换言之,遵循以下事件顺序:

用户事务1开启并写记录

用户事务2开启并写了一些其他记录

用户提交事务2

即使事务2已经提交了记录,在事务1提交或中止之前,消费者也不会看到它们。这有两个含义:

首先,在Flink应用程序的正常工作期间,用户可以预期Kafka主题中生成的记录的可见性会延迟,等于已完成检查点之间的平均时间。

其次,在Flink应用程序失败的情况下,读者将阻止此应用程序编写的主题,直到应用程序重新启动或配置的事务超时时间过去为止。此注释仅适用于有多个代理/应用程序写入同一Kafka主题的情况。

Semantic.EXACTLY_ONCE 模式为每个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。

Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻碍消费者阅读Kafka主题的延迟事务,这是必要的。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序是不安全的 FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR。

3.10 Kafka消费者及其容错

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他 算子操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。

因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。

检查点常用参数

enableCheckpointing

启用流式传输作业的检查点。 将定期快照流式数据流的分布式状态。 如果发生故障,流数据流将从最新完成的检查点重新启动。

该作业在给定的时间间隔内定期绘制检查点。 状态将存储在配置的状态后端。

此刻未正确支持检查点迭代流数据流。 如果“force”参数设置为true,则系统仍将执行作业。

Flink实战(八) - Streaming Connectors 编程(下)参考

setCheckpointingMode

Flink实战(八) - Streaming Connectors 编程(下)参考

setCheckpointTimeout

Flink实战(八) - Streaming Connectors 编程(下)参考

setMaxConcurrentCheckpoints

Flink实战(八) - Streaming Connectors 编程(下)参考

要使用容错的Kafka使用者,需要在运行环境中启用拓扑的检查点:

  • Flink实战(八) - Streaming Connectors 编程(下)参考
Flink实战(八) - Streaming Connectors 编程(下)参考

另请注意,如果有足够的处理插槽可用于重新启动拓扑,则Flink只能重新启动拓扑。因此,如果拓扑由于丢失了TaskManager而失败,那么之后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。

如果未启用检查点,Kafka使用者将定期向Zookeeper提交偏移量。

参考

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html http://kafka.apache.org/documentation/