![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLxEGZyQmNxkjNjJWZyQWM2gDO5ATN3ImYxATO0UmYhN2NiFTO1QmMh9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
Flink log connector是阿裡雲日志服務提供的,用于對接flink的工具,包括兩部分,消費者(Consumer)和生産者(Producer)。
消費者用于從日志服務中讀取資料,支援exactly once語義,支援shard負載均衡.
生産者用于将資料寫入日志服務,使用connector時,需要在項目中添加maven依賴:
在Connector中, 類FlinkLogConsumer提供了訂閱日志服務中某一個LogStore的能力,實作了exactly once語義,在使用時,使用者無需關心LogStore中shard數
量的變化,consumer會自動感覺。
flink中每一個子任務負責消費LogStore中部分shard,如果LogStore中shard發生split或者merge,子任務消費的shard也會随之改變。
上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。
注意,flink stream的子任務數量和日志服務LogStore中的shard數量是獨立的,如果shard數量多于子任務數量,每個子任務不重複的消費多個shard,如果少于,
那麼部分子任務就會空閑,等到新的shard産生。
Flink log consumer支援設定shard的消費起始位置,通過設定屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:
Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的資料開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的資料開始消費。
UnixTimestamp: 一個整型數值的字元串,用1970-01-01到現在的秒數表示, 含義是消費shard中這個時間點之後的資料。
三種取值舉例如下:
Flink log consumer支援設定消費進度監控,所謂消費進度就是擷取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
注意上面代碼是可選的,如果設定了,consumer會首先建立consumerGroup,如果已經存在,則什麼都不做,consumer中的snapshot會自動同步到日志服務的consumerGroup中,使用者可以在日志服務的控制台檢視consumer的消費進度。
當打開Flink的checkpointing功能時,Flink log consumer會周期性的将每個shard的消費進度儲存起來,當作業失敗時,flink會恢複log consumer,并
從儲存的最新的checkpoint開始消費。
寫checkpoint的周期定義了當發生失敗時,最多多少的資料會被回溯,也就是重新消費,使用代碼如下:
Flink log consumer 會用到的阿裡雲日志服務接口如下:
GetCursorOrData
ListShards
CreateConsumerGroup
ConsumerGroupUpdateCheckPoint
子使用者使用Flink log consumer需要授權如下幾個RAM Policy:
接口
資源
log:GetCursorOrData
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:ListShards
log:CreateConsumerGroup
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
FlinkLogProducer 用于将資料寫到阿裡雲日志服務中。
注意producer隻支援Flink at-least-once語義,這就意味着在發生作業失敗的情況下,寫入日志服務中的資料有可能會重複,但是絕對不會丢失。
用法示例如下,我們将模拟産生的字元串寫入日志服務:
Producer初始化主要需要做兩件事情:
初始化配置參數Properties, 這一步和Consumer類似, Producer有一些定制的參數,一般情況下使用預設值即可,特殊場景可以考慮定制:
重載LogSerializationSchema,定義将資料序列化成RawLogGroup的方法。
如果使用者需要使用日志服務的shardHashKey功能,指定資料寫到某一個shard中,可以使用LogPartitioner産生資料的hashKey,用法例子如下:
注意LogPartitioner是可選的,不設定情況下, 資料會随機寫入某一個shard。
Producer依賴日志服務的API寫資料,如下:
log:PostLogStoreLogs
當RAM子使用者使用Producer時,需要對上述兩個API進行授權: