天天看點

Kafka Eagle 源碼解讀

1.概述

  在《Kafka 消息監控 - Kafka Eagle》一文中,簡單的介紹了 Kafka Eagle這款監控工具的作用,截圖預覽,以及使用詳情。今天筆者通過其源碼來解讀實作細節。目前該項目已托管于 Github 之上,作者編寫了使用手冊,告知使用者如何安裝,部署,啟動該系統。但對于實作的細節并未在參考手冊中詳細指出。這裡,筆者通過本篇博文,來詳細解讀其實作細節。相關資料文獻位址如下所示:

  • Kafka Eagle 源碼位址
  • Kafka Eagle 參考手冊
  • Kafka Eagle 安裝包

2.内容

  截止到版本 Kafka Eagle v1.1.1 支援監控0.8.2.x(存放消費資訊于Zookeeper)以及 0.10.x(存放消費資訊于Kafka的topic中)。對于前者,從Zookeeper中擷取消息資訊,難度不大,編寫Zookeeper用戶端實作代碼即可,該版本在Zookeeper下的存儲結構樹如下圖所示:

Kafka Eagle 源碼解讀

對于實作細節,可使用ZkUtils工具類來擷取相關資料,以擷取消費資訊為例,代碼如下所示:

/** Obtaining kafka consumer information from zookeeper. */
    public Map<String, List<String>> getConsumers(String clusterAlias) {
        ZkClient zkc = zkPool.getZkClient(clusterAlias);
        Map<String, List<String>> consumers = new HashMap<String, List<String>>();
        try {
            Seq<String> subConsumerPaths = ZkUtils.getChildren(zkc, CONSUMERS_PATH);
            List<String> groups = JavaConversions.seqAsJavaList(subConsumerPaths);
            for (String group : groups) {
                String path = CONSUMERS_PATH + "/" + group + "/owners";
                if (ZkUtils.pathExists(zkc, path)) {
                    Seq<String> owners = ZkUtils.getChildren(zkc, path);
                    List<String> ownersSerialize = JavaConversions.seqAsJavaList(owners);
                    consumers.put(group, ownersSerialize);
                } else {
                    LOG.error("Consumer Path[" + path + "] is not exist.");
                }
            }
        } catch (Exception ex) {
            LOG.error(ex.getMessage());
        } finally {
            if (zkc != null) {
                zkPool.release(clusterAlias, zkc);
                zkc = null;
            }
        }
        return consumers;
    }      

其他監控資訊可以按照Zookeeper中結構樹路徑擷取。如下圖所示:

Kafka Eagle 源碼解讀

然而,對于新版本,官方預設将消費資訊遷移到Kafka的topic中,這樣原來的接口隻能擷取topic,broker等資訊,對于消費的資訊,我們需要從kafka中一個叫__consumer_offsets的topic中擷取。為了相容0.8.2.x版本的Kafka,這裡在Kafka Eagle中另外啟動一個RpcServer來貢獻__consumer_offsets中的消費資訊。消費__consumer_offsets這個topic時,需要指定該内部topic不暴露給consumer,将 exclude.internal.topics 設定為 false 即可。這樣我們通過一個 kafka.eagle.offset.storage 開關來控制系統擷取監控中繼資料的走向。擷取流程如下圖所示:

Kafka Eagle 源碼解讀

3.消費 Owner

  當消費的資訊存放于Zookeeper中,我們可以直接從consumer子產品下直接擷取對應的Owner,但是在Kafka的Topic中,我們需要編碼來間接的擷取。這裡,我們需要知道 Kafka 的Owner的組成規則,其規則由 Group+ConusmerHostAddress+Timespan+UUID+PartitionId組成,實作細節可參考源碼,界面展示如下圖所示:

Kafka Eagle 源碼解讀

4.Kafka SQL

  關于Kafka SQL,旨在使用SQL來快速可視化Topic的相關資訊,目前 Kafka SQL 實作的功能包含有展示某一個Topic的Partition,Offset,以及其對應的消息記錄,若不加limit條件限制,預設展示該Topic下最新的5000條記錄,詳細實作細節,可參看源碼,預覽截圖如下所示:

Kafka Eagle 源碼解讀

查詢結果,如下圖所示:

Kafka Eagle 源碼解讀

5.多叢集

  Kafka Eagle 目前是支援多叢集監控,所謂多叢集,是指多個Zookeeper叢集下的Kafka叢集,通過切換Session來管理不同的Zookeeper叢集下的Kafka叢集,細節參看源碼。管理界面如下圖所示:

Kafka Eagle 源碼解讀

6.總結

  Kafka Eagle總體實作思路基本如上所述。針對,Kafka 0.10.x版本,Kafka Eagle監控部分子產品不展示的問題,這裡在啟動 Kafka Eagle之前,預設啟動一個系統consumer來消費kafka.eagle該group下的__system.topic__,保證__consumer_offsets是有資料可供擷取的。

7.結束語

  這篇部落格就和大家分享到這裡,該項目會一直維護,喜歡的同學可以在 Github 上 Star 一下,如果大家在研究學習的過程當中有什麼問題,可以加群進行讨論或發送郵件給我,我會盡我所能為您解答,與君共勉!

聯系方式:

郵箱:[email protected]

Twitter:https://twitter.com/smartloli

QQ群(Hadoop - 交流社群1):424769183

QQ群(Kafka并不難學): 825943084

溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),友善管理者稽核,謝謝!

熱愛生活,享受程式設計,與君共勉!

公衆号:

Kafka Eagle 源碼解讀

作者:哥不是小蘿莉 [關于我][犒賞]

出處:http://www.cnblogs.com/smartloli/

轉載請注明出處,謝謝合作!