天天看點

filebeat 接入kafka

1,在官網下載下傳filebeat

官網下載下傳位址:

https://www.elastic.co/cn/downloads

2,下載下傳kafka

下載下傳位址:

http://archive.apache.org/dist/kafka/

3,下載下傳完kafka之後,直接解壓即可

wget -c http://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz

tar -xf kafka-2.1.1-src.tg

4,kafka常用指令介紹

進入kafka目錄

cd kafka_2.11-2.1.1

啟動zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

啟動kafka

./bin/kafka-server-start.sh ./config/server.properties &

建立topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

列出topic

./bin/kafka-topics.sh --list --zookeeper localhost:2181

啟動生産者并發送消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

開啟另外一個終端,啟動消費者接受消息

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

效果圖如下:

生産者:

filebeat 接入kafka

消費者:

filebeat 接入kafka

5,測試成功,現在我們配置filebeat的配置檔案

filebeat.inputs:

type: log

enabled: true

paths:

#- /var/log/*.log

/var/log/messages #監聽的日志檔案

filebeat.config.modules:

path: ${path.config}/modules.d/*.yml

reload.enabled: false

setup.template.settings:

index.number_of_shards: 3

output.kafka:

hosts: ["10.10.100.215:9092","10.10.100.216:9092","10.10.100.217:9092"] # kafka叢集配置

topic: "caolihua01" #kafka 訂閱的主題

6,啟動filebeat并檢視控制台輸出

filebeat 接入kafka

7,檢視kafka是否接受到filebeat收集到的日志資訊

filebeat 接入kafka