文章目錄
- 八、Flink項目實戰
-
- 1、需求背景
- 2、資料流程設計
- 3、應用實作
- 4、實作效果分析
Flink流式計算實戰專題五
==樓蘭
八、Flink項目實戰
這一個章節,我們來找一個常見的流式計算場景,将Flink真正用起來。
1、需求背景
現在網絡直播平台非常火爆,在鬥魚這樣的網絡直播間,經常可以看到這樣的總榜排名,展現了主播的活躍度。我們就以這個貢獻日榜為例,來設計一個Flink的計算程式。
大家可以思考下這樣的業務場景應該要如何實作?
對于貢獻日榜 這樣的功能,可以了解為是一個典型的流式計算的場景,強調的是資料的實時處理。因為在這個場景下,必須要及時的累計使用者的送禮物資料,才能形成你追我趕的實時效果,提升使用者的參與體驗。這個場景下的實時性,雖然不要求每一條資料都及時響應,但是整體的資料延遲還是要盡量縮短的。例如有一個狂熱粉絲想要争搶日榜榜首的位置,雖然不要求送了禮物之後就立即重新整理到榜首,但是最長也需要在幾分鐘内能夠看到榜單的重新整理效果。這在大資料場景下,也是非常有挑戰的。
這種場景下,使用Flink進行流批統一的計算,感覺就非常合适。
2、資料流程設計
在确定了使用Flink進行計算後,首先就需要設計出資料的上下遊流程,進行簡單的方案可行性評估。
對于資料上遊,我們這個貢獻榜統計的業務場景,資料來源自然就是粉絲們的打賞行為。一方面整個平台的打賞行為的資料量是非常大的,另一方面這些打賞行為涉及到賬戶操作,是以他的作用,更大的是展現在貢獻榜功能以外的其他業務過程中。基于這兩方面考慮,自然就會想到使用kafka來進行削峰以及解耦。而Flink在DataStream/DataSet API和 Table API&SQL 兩個部分都對kafka提供了連接配接器實作,是以用kafka作為資料接入是可行的。
而對于資料下遊,其實可以想象,最終計算出來的資料,最為重要的是要強調查詢的靈活性以及時效性,這樣才能支援頁面的快速查詢。如果考慮查詢的時效性,HBase和ElasticSearch都是比較理想的大資料存儲引擎。但是如果考慮到查詢的靈活性,就會想到ElasticSearch會相比HBase更适合。因為我們統計出來的這些粉絲貢獻度的結果,不光可以作為每個直播間貢獻榜的排名,也應該可以作為以後平台主播年度排名等其他業務場景的資料來源。如果想要兼顧這些查詢場景,使用HBase就會對Rowkey産生大量的侵入,而Elasticsearch可以根據任意字段快速查詢,就比較有優勢。 另外,從官方文檔中可以查到,對于HBase,Flink隻提供了Table API&SQL 子產品的connector支援,而DataStream/DataSet API中沒有提供支援,而ElasticSearch則支援更為全面。當然,這跟HBase的具體場景是有關聯的,但是也可以從另一個角度認為,使用ElasticSearch的可行性更高。
這樣,就初步确定了 kafka-> Flink -> ElasticSearch 這樣的大緻資料流程。這也是在實際開發中非常典型的一個組合方式。後續就可以着手搭建kafka叢集以及ElasticSearch+Kibana的叢集了。在這次示例中,我們還是會以hadoop01,hadoop02,hadoop03這三台機器來搭建叢集。搭建的過程就略過了。
然後确定資料的基礎結構
這一步主要是确定入口資料和出口資料的結構。隻要這兩個資料結構确定了,那麼應用程式子產品和大資料計算子產品就可以分開進行開發了。是雙方主要的解耦方式。
在資料入口處,可以定義這樣的簡化的資料結構:
public static class GiftRecord{
private String hostId; //主播ID
private String fansId; //粉絲ID
private long giftCount; //禮物數量
private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS
.....
}
在kafka中,确定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字元串。
在資料出口處,可以定義ES中這樣簡化的索引結構:
-- 貢獻日榜索引
PUT daygiftanalyze
{
"mappings":{
"properties": {
"windowEnd":{
"type": "long"
},
"hostId": {
"type": "keyword"
},
"fansId": {
"type": "keyword"
},
"giftCount":{
"type": "long"
}
}
}
}
這樣,一個簡單的設計方案就形成了。應用程式隻需要在粉絲發送禮物時往kafka中同步一條消息記錄,然後從ES中查詢主播的貢獻日榜和貢獻周榜資料即可。而我們也可以模拟資料格式進行開發了。
3、應用實作
貢獻日榜:
基礎資料結構:
public static class GiftRecord{
private String hostId; //主播ID
private String fansId; //粉絲ID
private long giftCount; //禮物數量
private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS
.....
}
在kafka中,确定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字元串。
ES索引:
PUT daygiftanalyze
{
"mappings":{
"properties": {
"windowEnd":{
"type": "long"
},
"hostId": {
"type": "keyword"
},
"fansId": {
"type": "keyword"
},
"giftCount":{
"type": "long"
}
}
}
}
然後運作Flink程式,com.roy.flink.project.flink.DayGiftAna,從kafka中讀取資料。測試資料見giftrecord.txt。計算程式會及時将十分鐘内的粉絲禮物統計都存入到ES當中。
ES查詢語句:
GET daygiftanalyze/_search
{
"query": {
"bool": {
"must": [
{"range": {
"windowEnd": {
"gte": 1631635200000,
"lte": 1631721600000
}
}},
{"match": {
"hostId": "1001"
}}
]
}
},
"aggs": {
"groupByFans": {
"terms": {
"field": "fansId",
"size": 3,
"order": {
"giftCount": "desc"
}
},
"aggs": {
"giftCount": {
"sum": {
"field": "giftCount"
}
}
}
}
}
}
ES中的查詢結果:
直播應用就可以根據這個查詢結果組織用戶端查詢代碼,最終實作日榜排名的功能。
4、實作效果分析
具體的計算方案參見示例代碼,這裡就不多做分析了。這裡隻分析一下在實作過程中需要注意的幾個重要的問題:
1、時間語義分析:
對于網絡直播這樣的場景,從下午六點到第二天早上六點才是一天的高峰期,是以,在進行統計時,将每一天的統計時間定義為從早上六點到第二天早上六點,這樣就能盡量保持高峰期的完整性。很多跟娛樂相關的場景,比如網絡遊戲,也大都是以這樣的範圍來定義一天,而不是傳統意義上的從0點到24點。
2、并行度優化:
可以直接使用Flink的開窗機制,待一周的資料收集完整了之後,一次性向ES中輸出統計結果,這種場景下要注意累計器的持久化,以及計算程式出錯後的重新開機恢複機制。
3、後續的改進方式:
狀态後端、
而對于貢獻日榜的計算,就不能等一天的資料收集齊了再計算了。這時是有兩種解決方案,一種是完全的流處理方式。也就是每來一條資料就往ES中更新結果。另一中方式是采用小批量的流處理方式。以五分鐘為機關,将資料拆分成一個一個小視窗來進行處理。顯然後一種方式對資料處理的壓力會比較小一點。雖然資料量會更多,但是ES的存儲以及快速查詢能力可以比較好的彌補資料量的問題。也是以,在設計ES資料機構時,将貢獻日榜的文檔結構設計成了一個一個的小範圍。