天天看点

日志服务Flink Connector《支持Exactly Once》

日志服务Flink Connector《支持Exactly Once》

Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.

生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数

量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。

Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。

UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。

三种取值举例如下:

Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并

从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

Flink log consumer 会用到的阿里云日志服务接口如下:

GetCursorOrData

ListShards

CreateConsumerGroup

ConsumerGroupUpdateCheckPoint

子用户使用Flink log consumer需要授权如下几个RAM Policy:

接口

资源

log:GetCursorOrData

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

log:ListShards

log:CreateConsumerGroup

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

log:ConsumerGroupUpdateCheckPoint

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

FlinkLogProducer 用于将数据写到阿里云日志服务中。

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

Producer初始化主要需要做两件事情:

初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

Producer依赖日志服务的API写数据,如下:

log:PostLogStoreLogs

当RAM子用户使用Producer时,需要对上述两个API进行授权:

继续阅读