天天看點

基于Flink的日志采集

目前基于ELK架構的日志系統,通過filebeat收集上來的日志都會發送到同一個kafka topic中,然後再由Logstash消費處理寫入Elasticsearch中,這種方式導緻該topic包含所有業務日志,那麼各個業務去做實時統計分析就會造成重複消費,使得流量成本的浪費;對于離線分析的日志來源是通過在應用服務端定時上傳的方式,對于日志量比較大的業務,一方面上傳時會對應用伺服器造成比較大的壓力,另一方面這種上傳方式對于後續小時或者分鐘級别分析造成一定延時。

本文将會介紹基于Flink的日志采集平台來解決這些問題。

基于Flink的日志采集

•拆分:最上層Kafka A表示由filebeat收集上來的應用日志,然後通過Flink程式對Kafka topic進行拆分,根據不同的業務拆分到到下遊Kafka B不同的topic中,那麼對于下遊的實時處理任務隻需要消費對應的業務日志即可,避免了重複消費;•轉儲:對于發送到Kafka B不同的業務日志,通過Flink程式轉儲寫入到HDFS上,生成小時分區檔案,供後續的離線日志分析

避免重複消費:為了避免對大topic的重複消費,對于同一個topic隻會消費一次,也就是隻會啟動一個Flink任務,按照一定的規則對資料進行拆分,常見的規則就是應用名稱、類型、日志檔案名稱等,在filebeat收集的時候這些資訊都會被帶上,作為拆分的依據;可配置化:為了滿足業務方能夠快速擷取自己的業務日志,就必須提供可配置規則的可視化界面,提供填寫拆分應用辨別、目标Kafka topic等,将這些規則資訊儲存在資料庫中,然後拆分的Flink任務定時加載規則資訊;日志格式:在實踐中規定日志格式是非常有必要的,為了保證拆分任務能夠準确的拆分出對應的業務日志,就必須按照指定的日志格式進行打點

通用實作:對于不同的業務日志,其日志的具體内容肯定各不相同,對于我們來說不可能每一個業務都去寫一套轉儲的程式,希望一套程式能夠處理所有的業務日志,是以對于我們來說不管任何日志對于我們來說其所代表的含義就是一個data字段對應的資料,那麼就隻需要把這個data字段寫入到對應的hdfs目錄檔案即可;資料分區:預設分區字段根據日志中一個固定的時間字段進行分區,在實踐中對于老的日志并沒有按照規範日志進行打點或者分區的時間字段不是通用的一個字段,需要按照日志中一個特殊的字段解析進行分區,如果将這個解析直接放在程式裡面根據業務判斷,最終的結果會造成代碼很難維護,解決方式就是将DataStream處理轉換為Table/SQL 的處理,将資料流注冊成表,然後通過udf去解析出來需要的分區字段,同樣這個udf無法通用,那麼就必須支援不同的udf,但是對于處理卻是通用的,例如: select data,udf(data) from tbl , 是一個固定的模闆,隻需要對于不同的轉儲程式加載不同的udf即可,通過Calcite 做sql文法解析,解析出使用的udf, 然後将其注冊即可;可配置化:同樣需要提供界面讓業務隻需要通過配置一些規則即可完成日志的收集,配置消費的topic、寫入資料位置、自定義分區語句支援(上面提到的自定義udf)等,在背景自動完成日志的收集開啟;其他幾點:日志壓縮與小檔案合并可參考:StreamingFileSink壓縮與合并小檔案; 在實作過程中可能會存在叢集遷移的場景,即将資料寫入到另外的一個叢集中,對于bulk的檔案寫入方式,其檔案的滾動會在每次checkpoint使檔案滾動,使用的滾動政策實作是OnCheckpointRollingPolicy,是以可以直接将hdfs檔案copy到另外一個叢集中,重新消費kafka的offset與生成的檔案是同步的,但是存在另外一個問題,在hdfs上檔案名稱的生成規則是part-subtask-index,此時切換叢集任務沒有從checkpoint恢複index重新從0開始遞增,存在覆寫以前檔案的風險,是以對檔案生成規則進行自定義,例如加上叢集辨別等。

本篇主要介紹了基于Flink的采集架構以及一些關鍵的實作點,歡迎交流。