天天看點

Flink流式計算從入門到實戰五Flink流式計算實戰專題五八、Flink項目實戰

文章目錄

  • 八、Flink項目實戰
    • 1、需求背景
    • 2、資料流程設計
    • 3、應用實作
    • 4、實作效果分析

Flink流式計算實戰專題五

==樓蘭

八、Flink項目實戰

​ 這一個章節,我們來找一個常見的流式計算場景,将Flink真正用起來。

1、需求背景

​ 現在網絡直播平台非常火爆,在鬥魚這樣的網絡直播間,經常可以看到這樣的總榜排名,展現了主播的活躍度。我們就以這個貢獻日榜為例,來設計一個Flink的計算程式。

Flink流式計算從入門到實戰五Flink流式計算實戰專題五八、Flink項目實戰

​ 大家可以思考下這樣的業務場景應該要如何實作?

​ 對于貢獻日榜 這樣的功能,可以了解為是一個典型的流式計算的場景,強調的是資料的實時處理。因為在這個場景下,必須要及時的累計使用者的送禮物資料,才能形成你追我趕的實時效果,提升使用者的參與體驗。這個場景下的實時性,雖然不要求每一條資料都及時響應,但是整體的資料延遲還是要盡量縮短的。例如有一個狂熱粉絲想要争搶日榜榜首的位置,雖然不要求送了禮物之後就立即重新整理到榜首,但是最長也需要在幾分鐘内能夠看到榜單的重新整理效果。這在大資料場景下,也是非常有挑戰的。

​ 這種場景下,使用Flink進行流批統一的計算,感覺就非常合适。

2、資料流程設計

​ 在确定了使用Flink進行計算後,首先就需要設計出資料的上下遊流程,進行簡單的方案可行性評估。

​ 對于資料上遊,我們這個貢獻榜統計的業務場景,資料來源自然就是粉絲們的打賞行為。一方面整個平台的打賞行為的資料量是非常大的,另一方面這些打賞行為涉及到賬戶操作,是以他的作用,更大的是展現在貢獻榜功能以外的其他業務過程中。基于這兩方面考慮,自然就會想到使用kafka來進行削峰以及解耦。而Flink在DataStream/DataSet API和 Table API&SQL 兩個部分都對kafka提供了連接配接器實作,是以用kafka作為資料接入是可行的。

​ 而對于資料下遊,其實可以想象,最終計算出來的資料,最為重要的是要強調查詢的靈活性以及時效性,這樣才能支援頁面的快速查詢。如果考慮查詢的時效性,HBase和ElasticSearch都是比較理想的大資料存儲引擎。但是如果考慮到查詢的靈活性,就會想到ElasticSearch會相比HBase更适合。因為我們統計出來的這些粉絲貢獻度的結果,不光可以作為每個直播間貢獻榜的排名,也應該可以作為以後平台主播年度排名等其他業務場景的資料來源。如果想要兼顧這些查詢場景,使用HBase就會對Rowkey産生大量的侵入,而Elasticsearch可以根據任意字段快速查詢,就比較有優勢。 另外,從官方文檔中可以查到,對于HBase,Flink隻提供了Table API&SQL 子產品的connector支援,而DataStream/DataSet API中沒有提供支援,而ElasticSearch則支援更為全面。當然,這跟HBase的具體場景是有關聯的,但是也可以從另一個角度認為,使用ElasticSearch的可行性更高。

​ 這樣,就初步确定了 kafka-> Flink -> ElasticSearch 這樣的大緻資料流程。這也是在實際開發中非常典型的一個組合方式。後續就可以着手搭建kafka叢集以及ElasticSearch+Kibana的叢集了。在這次示例中,我們還是會以hadoop01,hadoop02,hadoop03這三台機器來搭建叢集。搭建的過程就略過了。

​ 然後确定資料的基礎結構

​ 這一步主要是确定入口資料和出口資料的結構。隻要這兩個資料結構确定了,那麼應用程式子產品和大資料計算子產品就可以分開進行開發了。是雙方主要的解耦方式。

​ 在資料入口處,可以定義這樣的簡化的資料結構:

public static class GiftRecord{
    private String hostId; //主播ID
    private String fansId; //粉絲ID
    private long giftCount; //禮物數量
    private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS
    .....
}
           

​ 在kafka中,确定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字元串。

​ 在資料出口處,可以定義ES中這樣簡化的索引結構:

-- 貢獻日榜索引
PUT daygiftanalyze
{
  "mappings":{
    "properties": {
      "windowEnd":{
        "type": "long"
      },
      "hostId": {
        "type": "keyword"
      },
      "fansId": {
        "type": "keyword"
      },
      "giftCount":{
        "type": "long"
      }
    }
  }
}
           

​ 這樣,一個簡單的設計方案就形成了。應用程式隻需要在粉絲發送禮物時往kafka中同步一條消息記錄,然後從ES中查詢主播的貢獻日榜和貢獻周榜資料即可。而我們也可以模拟資料格式進行開發了。

3、應用實作

貢獻日榜:

基礎資料結構:

public static class GiftRecord{
    private String hostId; //主播ID
    private String fansId; //粉絲ID
    private long giftCount; //禮物數量
    private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS
    .....
}
           

在kafka中,确定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字元串。

ES索引:

PUT daygiftanalyze
{
  "mappings":{
    "properties": {
      "windowEnd":{
        "type": "long"
      },
      "hostId": {
        "type": "keyword"
      },
      "fansId": {
        "type": "keyword"
      },
      "giftCount":{
        "type": "long"
      }
    }
  }
}
           

​ 然後運作Flink程式,com.roy.flink.project.flink.DayGiftAna,從kafka中讀取資料。測試資料見giftrecord.txt。計算程式會及時将十分鐘内的粉絲禮物統計都存入到ES當中。

ES查詢語句:

GET daygiftanalyze/_search
{
  "query": {
    "bool": {
      "must": [
        {"range": {
          "windowEnd": {
            "gte": 1631635200000,
            "lte": 1631721600000
          }
        }},
        {"match": {
          "hostId": "1001"
        }}
      ]
    }
  },
  "aggs": {
    "groupByFans": {
      "terms": {
        "field": "fansId",
        "size": 3,
        "order": {
          "giftCount": "desc"
        }
      },
      "aggs": {
        "giftCount": {
          "sum": {
            "field": "giftCount"
          }
        }
      }
    }
  }
}
           

ES中的查詢結果:

Flink流式計算從入門到實戰五Flink流式計算實戰專題五八、Flink項目實戰
直播應用就可以根據這個查詢結果組織用戶端查詢代碼,最終實作日榜排名的功能。

4、實作效果分析

​ 具體的計算方案參見示例代碼,這裡就不多做分析了。這裡隻分析一下在實作過程中需要注意的幾個重要的問題:

1、時間語義分析:

​ 對于網絡直播這樣的場景,從下午六點到第二天早上六點才是一天的高峰期,是以,在進行統計時,将每一天的統計時間定義為從早上六點到第二天早上六點,這樣就能盡量保持高峰期的完整性。很多跟娛樂相關的場景,比如網絡遊戲,也大都是以這樣的範圍來定義一天,而不是傳統意義上的從0點到24點。

2、并行度優化:

​ 可以直接使用Flink的開窗機制,待一周的資料收集完整了之後,一次性向ES中輸出統計結果,這種場景下要注意累計器的持久化,以及計算程式出錯後的重新開機恢複機制。

3、後續的改進方式:

​ 狀态後端、

而對于貢獻日榜的計算,就不能等一天的資料收集齊了再計算了。這時是有兩種解決方案,一種是完全的流處理方式。也就是每來一條資料就往ES中更新結果。另一中方式是采用小批量的流處理方式。以五分鐘為機關,将資料拆分成一個一個小視窗來進行處理。顯然後一種方式對資料處理的壓力會比較小一點。雖然資料量會更多,但是ES的存儲以及快速查詢能力可以比較好的彌補資料量的問題。也是以,在設計ES資料機構時,将貢獻日榜的文檔結構設計成了一個一個的小範圍。