天天看點

日志服務Flink Connector《支援Exactly Once》

日志服務Flink Connector《支援Exactly Once》

Flink log connector是阿裡雲日志服務提供的,用于對接flink的工具,包括兩部分,消費者(Consumer)和生産者(Producer)。

消費者用于從日志服務中讀取資料,支援exactly once語義,支援shard負載均衡.

生産者用于将資料寫入日志服務,使用connector時,需要在項目中添加maven依賴:

在Connector中, 類FlinkLogConsumer提供了訂閱日志服務中某一個LogStore的能力,實作了exactly once語義,在使用時,使用者無需關心LogStore中shard數

量的變化,consumer會自動感覺。

flink中每一個子任務負責消費LogStore中部分shard,如果LogStore中shard發生split或者merge,子任務消費的shard也會随之改變。

上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任務數量和日志服務LogStore中的shard數量是獨立的,如果shard數量多于子任務數量,每個子任務不重複的消費多個shard,如果少于,

那麼部分子任務就會空閑,等到新的shard産生。

Flink log consumer支援設定shard的消費起始位置,通過設定屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:

Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的資料開始消費。

Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的資料開始消費。

UnixTimestamp: 一個整型數值的字元串,用1970-01-01到現在的秒數表示, 含義是消費shard中這個時間點之後的資料。

三種取值舉例如下:

Flink log consumer支援設定消費進度監控,所謂消費進度就是擷取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考

注意上面代碼是可選的,如果設定了,consumer會首先建立consumerGroup,如果已經存在,則什麼都不做,consumer中的snapshot會自動同步到日志服務的consumerGroup中,使用者可以在日志服務的控制台檢視consumer的消費進度。

當打開Flink的checkpointing功能時,Flink log consumer會周期性的将每個shard的消費進度儲存起來,當作業失敗時,flink會恢複log consumer,并

從儲存的最新的checkpoint開始消費。

寫checkpoint的周期定義了當發生失敗時,最多多少的資料會被回溯,也就是重新消費,使用代碼如下:

Flink log consumer 會用到的阿裡雲日志服務接口如下:

GetCursorOrData

ListShards

CreateConsumerGroup

ConsumerGroupUpdateCheckPoint

子使用者使用Flink log consumer需要授權如下幾個RAM Policy:

接口

資源

log:GetCursorOrData

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

log:ListShards

log:CreateConsumerGroup

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

log:ConsumerGroupUpdateCheckPoint

acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

FlinkLogProducer 用于将資料寫到阿裡雲日志服務中。

注意producer隻支援Flink at-least-once語義,這就意味着在發生作業失敗的情況下,寫入日志服務中的資料有可能會重複,但是絕對不會丢失。

用法示例如下,我們将模拟産生的字元串寫入日志服務:

Producer初始化主要需要做兩件事情:

初始化配置參數Properties, 這一步和Consumer類似, Producer有一些定制的參數,一般情況下使用預設值即可,特殊場景可以考慮定制:

重載LogSerializationSchema,定義将資料序列化成RawLogGroup的方法。

如果使用者需要使用日志服務的shardHashKey功能,指定資料寫到某一個shard中,可以使用LogPartitioner産生資料的hashKey,用法例子如下:

注意LogPartitioner是可選的,不設定情況下, 資料會随機寫入某一個shard。

Producer依賴日志服務的API寫資料,如下:

log:PostLogStoreLogs

當RAM子使用者使用Producer時,需要對上述兩個API進行授權:

繼續閱讀