一、KafkaOffsetMonitor簡述
KafkaOffsetMonitor是Kafka的一款用戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以浏覽目前的消費者組,并且每個Topic的所有Partition的消費情況都可以一目了然。
二、KafkaOffsetMonitor下載下傳
KafkaOffsetMonitor托管在Github上,可以通過Github下載下傳。
下載下傳位址:https://github.com/quantifind/KafkaOffsetMonitor/releases
或者下載下傳百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu
三、KafkaOffsetMonitor啟動
将下載下傳下來的KafkaOffsetMonitor jar包上傳到linux上,可以建立一個目錄KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯指令來運作這個jar包:
[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days
按回車後,可以看到控制台輸出:
serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started [email protected]:8088
如果沒有指定端口,則預設會開啟一個随機端口。
參數說明:
zk :zookeeper主機位址,如果有多個,用逗号隔開
port :應用程式端口
refresh :應用程式在資料庫中重新整理和存儲點的頻率
retain :在db中保留多長時間
dbName :儲存的資料庫檔案名,預設為offsetapp
為了更友善的啟動KafkaOffsetMonitor,可以寫一個啟動腳本來直接運作,我這裡建立一個名為:kafka-monitor-start.sh的腳本,然後編輯這個腳本:
[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;
然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的權限
啟動KafkaOffsetMonitor:
[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
[1] 6551
[[email protected] KafkaMonitor]# lsof -i:8088
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
四、KafkaOffsetMonitor Web UI
在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:
在下圖中有一個Visualizations頁籤,點選其中的Cluster Overview可以檢視目前Kafka叢集的Broker情況
五、簡單的Producer
1、建立一個Topic
首先為本次試驗建立一個Topic,指令如下:
bin/kafka-topics.sh \
--create \
--zookeeper 10.0.0.50:12181 \
--replication-factor 3 \
--partition 3 \
--topic kafkamonitor-simpleproducer
2、建立SimpleProducer代碼
在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;
/**
* Created by ckm on 2016/8/30.
*/
public class SimpleProducer {
public static void main(String[] args) {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
int i = 0;
String message = "";
while (true) {
message = "test-simple-producer : " + i ++;
kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
}
}
}
程式運作效果:
3、ConsoleConsumer消費該topic
用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消費截圖如下:
4、KafkaOffsetMonitor頁面
(1)在Topic List頁籤中,我們可以看到剛才建立的
kafkamonitor-simpleproducer
(2)點開後,能看到有一個console-consumer正在消費該topic
(3)繼續進入該Consumer,可以檢視該Consumer目前的消費狀況
這張圖檔的左上角顯示了目前Topic的生産速率,右上角顯示了目前Consumer的消費速率。
圖檔中還有三種顔色的線條,藍色的表示目前Topic中的Message數目,灰色的表示目前Consumer消費的offset位置,紅色的表示藍色灰色的內插補點,即目前Consumer滞後于Producer的message數目。
(4)看一眼各partition中的message消費情況
從上圖可以看到,目前有3個Partition,每個Partition中的message數目分布很不均勻。這裡可以與接下來的自定義Producer的情況進行一個對比。
六、自定義Partitioner的Producer
1、建立一個Topic
bin/kafka-topics.sh \
--create \
--zookeeper 10.0.0.50:12181 \
--replication-factor 3 \
--partition 3 \
--topic kafkamonitor-partitionedproducer
2、Partitioner代碼
邏輯很簡單,循環依次往各Partition中發送message。
import kafka.producer.Partitioner;
/**
* Created by ckm on 2018/1/8.
*/
public class TestPartitioner implements Partitioner {
public TestPartitioner() {
}
@Override
public int partition(Object key, int numPartitions) {
int intKey = (int) key;
return intKey % numPartitions;
}
}
3、Producer代碼
将自定義的Partitioner設定到Producer,其他調用過程和二中類似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;
/**
* Created by ckm on 2016/8/30.
*/
public class PartitionedProducer {
public static void main(String[] args) {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
int i = 0;
String message = "";
while (true) {
message = "test-partitioner-producer : " + i;
System.out.println(message);
kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
i ++;
}
}
}
代碼運作效果如下圖:
4、ConsoleConsumer消費Message
bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
消費效果如下圖:
5、KafkaOffsetMonitor頁面
其他頁面與上面的類似,這裡隻觀察一下每個partition中的message數目與第二節中的對比。可以看到這裡每個Partition中message分别是很均勻的。
注意事項:
注意這裡有一個坑,預設情況下Producer往一個不存在的Topic發送message時會自動建立這個Topic。由于在這個封裝中,有同時傳遞message和topic的情況,如果調用方法時傳入的參數反了,将會在Kafka叢集中自動建立Topic。在正常情況下,應該是先把Topic根據需要建立好,然後Producer往該Topic發送Message,最好把Kafka這個預設自動建立Topic的功能關掉。
那麼,假設真的不小心建立了多餘的Topic,在删除時,會出現“marked for deletion”提示,隻是将該topic标記為删除,使用list指令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個參數:
參數 | 預設值 | 作用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
七,KafkaOffsetMonitor 總結
KafkaOffsetMonitor:程式一個jar包的形式運作,部署較為友善。隻有監控功能,使用起來也較為安全。
除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:
Kafka Web Console:監控功能較為全面,可以預覽消息,監控Offset、Lag等資訊,但存在bug,不建議在生産環境中使用。
Kafka Manager:偏向Kafka叢集管理,若操作不當,容易導緻叢集出現故障。對Kafka實時生産和消費消息是通過JMX實作的。沒有記錄Offset、Lag等資訊。