天天看點

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

 一、KafkaOffsetMonitor簡述

KafkaOffsetMonitor是Kafka的一款用戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以浏覽目前的消費者組,并且每個Topic的所有Partition的消費情況都可以一目了然。

二、KafkaOffsetMonitor下載下傳

KafkaOffsetMonitor托管在Github上,可以通過Github下載下傳。

下載下傳位址:https://github.com/quantifind/KafkaOffsetMonitor/releases

或者下載下傳百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

三、KafkaOffsetMonitor啟動

将下載下傳下來的KafkaOffsetMonitor jar包上傳到linux上,可以建立一個目錄KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯指令來運作這個jar包:

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088  --refresh 5.seconds --retain 1.days
按回車後,可以看到控制台輸出:

serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started [email protected]:8088      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

如果沒有指定端口,則預設會開啟一個随機端口。

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
參數說明:

zk :zookeeper主機位址,如果有多個,用逗号隔開
port :應用程式端口
refresh :應用程式在資料庫中重新整理和存儲點的頻率
retain :在db中保留多長時間
dbName :儲存的資料庫檔案名,預設為offsetapp      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

為了更友善的啟動KafkaOffsetMonitor,可以寫一個啟動腳本來直接運作,我這裡建立一個名為:kafka-monitor-start.sh的腳本,然後編輯這個腳本:

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh 
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m  -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;
      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的權限

啟動KafkaOffsetMonitor:

[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
[1] 6551
[[email protected] KafkaMonitor]# lsof -i:8088
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)      

四、KafkaOffsetMonitor Web UI

在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

在下圖中有一個Visualizations頁籤,點選其中的Cluster Overview可以檢視目前Kafka叢集的Broker情況

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

五、簡單的Producer

1、建立一個Topic

  首先為本次試驗建立一個Topic,指令如下:

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
bin/kafka-topics.sh \
    --create \
    --zookeeper 10.0.0.50:12181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-simpleproducer      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

2、建立SimpleProducer代碼

  在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class SimpleProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        int i = 0;
        String message = "";
        while (true) {
            message = "test-simple-producer : " + i ++;
            kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
        }
    }
}      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

  程式運作效果: 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

3、ConsoleConsumer消費該topic

  用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer      

  消費截圖如下: 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

4、KafkaOffsetMonitor頁面

(1)在Topic List頁籤中,我們可以看到剛才建立的

kafkamonitor-simpleproducer

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

(2)點開後,能看到有一個console-consumer正在消費該topic 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

(3)繼續進入該Consumer,可以檢視該Consumer目前的消費狀況 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

  這張圖檔的左上角顯示了目前Topic的生産速率,右上角顯示了目前Consumer的消費速率。 

  圖檔中還有三種顔色的線條,藍色的表示目前Topic中的Message數目,灰色的表示目前Consumer消費的offset位置,紅色的表示藍色灰色的內插補點,即目前Consumer滞後于Producer的message數目。 

(4)看一眼各partition中的message消費情況 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

  從上圖可以看到,目前有3個Partition,每個Partition中的message數目分布很不均勻。這裡可以與接下來的自定義Producer的情況進行一個對比。

六、自定義Partitioner的Producer

1、建立一個Topic

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
bin/kafka-topics.sh \
    --create \
    --zookeeper 10.0.0.50:12181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-partitionedproducer      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

2、Partitioner代碼

  邏輯很簡單,循環依次往各Partition中發送message。

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
import kafka.producer.Partitioner;

/**
 * Created by ckm on 2018/1/8.
 */
public class TestPartitioner implements Partitioner {
    public TestPartitioner() {
    }

    @Override
    public int partition(Object key, int numPartitions) {
        int intKey = (int) key;
        return intKey % numPartitions;
    }
}      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

3、Producer代碼

  将自定義的Partitioner設定到Producer,其他調用過程和二中類似。

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class PartitionedProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
        int i = 0;
        String message = "";
        while (true) {
            message = "test-partitioner-producer : " + i;
            System.out.println(message);
            kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
            i ++;
        }
    }
}      
Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

  代碼運作效果如下圖: 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

4、ConsoleConsumer消費Message

bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer      

  消費效果如下圖: 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

5、KafkaOffsetMonitor頁面

  其他頁面與上面的類似,這裡隻觀察一下每個partition中的message數目與第二節中的對比。可以看到這裡每個Partition中message分别是很均勻的。 

  

Kafka監控工具KafkaOffsetMonitor配置及使用五、簡單的Producer六、自定義Partitioner的Producer

注意事項: 

  注意這裡有一個坑,預設情況下Producer往一個不存在的Topic發送message時會自動建立這個Topic。由于在這個封裝中,有同時傳遞message和topic的情況,如果調用方法時傳入的參數反了,将會在Kafka叢集中自動建立Topic。在正常情況下,應該是先把Topic根據需要建立好,然後Producer往該Topic發送Message,最好把Kafka這個預設自動建立Topic的功能關掉。 

  那麼,假設真的不小心建立了多餘的Topic,在删除時,會出現“marked for deletion”提示,隻是将該topic标記為删除,使用list指令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個參數:

參數 預設值 作用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

七,KafkaOffsetMonitor 總結

KafkaOffsetMonitor:程式一個jar包的形式運作,部署較為友善。隻有監控功能,使用起來也較為安全。

除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:

Kafka Web Console:監控功能較為全面,可以預覽消息,監控Offset、Lag等資訊,但存在bug,不建議在生産環境中使用。

Kafka Manager:偏向Kafka叢集管理,若操作不當,容易導緻叢集出現故障。對Kafka實時生産和消費消息是通過JMX實作的。沒有記錄Offset、Lag等資訊。