天天看點

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

ELK+Kafka+Filebeat日志系統

1.環境規劃

IP位址 部署的服務 主機名
192.168.81.210 es+kafka+zookeeper+kibana+logstash elk-1
192.168.81.220 es+kafka+zookeeper elk-2
192.168.81.230 es+kafka+zookeeper+nginx+filebeat elk-3

日志系統架構圖

nginx--->filebeat--->kafka--->logstash--->elasticsearch--->kibana

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

2.部署elasticsearch叢集

2.1.配置es-1節點

1.下載下傳elasticsearch7.6
[root@elk-1 ~]# wget https://mirrors.huaweicloud.com/elasticsearch/7.6.0/elasticsearch-7.6.0-x86_64.rpm
[root@elk-1 ~/soft]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm 

2.編輯配置檔案,配置叢集模式
[root@elk-1 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.210,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

3.建立資料目錄
[root@elk-1 ~]# mkdir /data/elasticsearch/ -p
[root@elk-1 ~]# chown -R elasticsearch.elasticsearch /data/elasticsearch/

4.配置記憶體鎖定
[root@elk-1 ~]# mkdir /etc/systemd/system/elasticsearch.service.d/
[root@elk-1 ~]# vim /etc/systemd/system/elasticsearch.service.d/override.conf
[Service]
LimitMEMLOCK=infinity

5.啟動elasticsearch
[root@elk-1 ~]# systemctl daemon-reload 
[root@elk-1 ~]# systemctl start elasticsearch
[root@elk-1 ~]# systemctl enable elasticsearch
           

2.2.配置es-2節點

隻是配置檔案中node.name和network.host不同,其他操作方式一緻

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-2
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.220,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"
           

2.3.配置es-3節點

隻是配置檔案中node.name和network.host不同,其他操作方式一緻

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-3
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.230,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"
           

2.4.使用es-head插件檢視叢集狀态

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

3.部署kibana

1.下載下傳kibana rpm包
[root@elk-1 ~]#  rpm -ivh kibana-7.6.0-x86_64.rpm 

2.配置kibana
[root@elk-1 ~]# vim /etc/kibana/kibana.yml
server.port: 5601									
server.host: "192.168.81.210"								
server.name: "elk-application"												
elasticsearch.hosts: ["http://192.168.81.210:9200"]				
i18n.locale: "zh-CN"

[root@elk-1 ~]# systemctl restart kibana
[root@elk-1 ~]#  systemctl enable elasticsearch
           

kibana部署成功

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

4.部署zookeeper

4.1.配置zookeeper-1節點

1.下載下傳軟體
[root@elk-1 ~]# wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

2.解壓并移動zookeeper
[root@elk-1 ~]# tar xf soft/zookeeper-3.4.13.tar.gz -C /data/
[root@elk-1 ~]# mv /data/zookeeper-3.4.13/ /data/zookeeper

3.建立資料目錄和日志目錄
[root@elk-1 ~]# mkdir /data/zookeeper/{data,logs}

4.準備配置檔案
[root@elk-1 ~]# cd /data/zookeeper/conf
[root@elk-1 /data/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
[root@elk-1 /data/zookeeper/conf]# vim zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

5.生成節點id檔案
#節點id隻能保護數字
[root@elk-1 /data/zookeeper]# echo 1 > /data/zookeeper/data/myid
           

4.2.配置zookeeper-2節點

與zookeeper-1節點隻有配置檔案和節點id檔案有點不同,其餘全一樣

[root@elk-2 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

[root@elk-2 /data/zookeeper/conf]# echo 2 > /data/zookeeper/data/myid
           

4.3.配置zookeeper-3節點

[root@elk-3 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

[root@elk-3 /data/zookeeper/conf]# echo 3 > /data/zookeeper/data/myid
           

4.4.啟動所有節點

zookeeper叢集必須保證有兩個節點存活,也就是說必須同時要啟動兩個節點,否則叢集将啟動不成功,是以要都修改好配置檔案後,再統一啟動
[root@elk-1 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@elk-2 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@elk-3 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: leader
           

5.部署kafka

注意:

不要使用kafka2.11版本,有嚴重的bug,filebeat無法寫入資料到kafka叢集,寫入的協定版本不同,存在問題

5.1.配置kafka-1節點

1.下載下傳二進制包
[root@elk-1 ~]# wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

2.安裝kafka
[root@elk-1 ~/soft]# tar xf kafka_2.13-2.4.0.tgz -C /data/
[root@elk-1 ~]# mv /data/kafka_2.13-2.4.0 /data/kafka

3.修改配置檔案
[root@elk-1 ~]# cd /data/kafka
[root@elk-1 /data/kafka]# vim config/server.properties 
broker.id=1
listeners=PLAINTEXT://192.168.81.210:9092
host.name=192.168.81.210
advertised.listeners=PLAINTEXT://192.168.81.210:9092
advertised.host.name=192.168.81.210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

4.建立資料目錄
[root@elk-3 ~]# mkdir /data/kafka/data
           

5.2.配置kafka-2節點

隻是配置檔案不同,其餘與kafka-1節點操作一緻

配置檔案需要改的地方:broker.id改成2,表示第二個節點 listeners host.name advertised.listeners advertised.host.name改成本機ip位址

[root@elk-2 /data/kafka]# cat config/server.properties 
broker.id=2
listeners=PLAINTEXT://192.168.81.220:9092
host.name=192.168.81.220
advertised.listeners=PLAINTEXT://192.168.81.220:9092
advertised.host.name=192.168.81.220
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
           

5.3.配置kafka-3節點

隻是配置檔案不同,其餘與kafka-1節點操作一緻

配置檔案需要改的地方:broker.id改成3,表示第三個節點 listeners host.name advertised.listeners advertised.host.name改成本機ip位址

[root@elk-3 /data/kafka]# cat config/server.properties 
broker.id=3
listeners=PLAINTEXT://192.168.81.230:9092
host.name=192.168.81.230
advertised.listeners=PLAINTEXT://192.168.81.230:9092
advertised.host.name=192.168.81.230
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
           

5.4.啟動kafka

[root@elk-1 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-2 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-3 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
           

6.測試kafka與zookeeper連接配接

kafka能夠産生資料并消費,整個叢集就可以使用了

1.建立一個topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --replication-factor 1 --partitions 1 --topic testpic
Created topic "testpic".

2.檢視topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
testpic

3.檢視topic的描述資訊
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --topic testpic

4.使用kafka-console-producer控制台生産資料
[root@elk-1 /data/kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic
>test1
>test2
>test3
>test4
>test5
>test6
>test7
>test8
>test9
>test10


5.使用kafka-console-consumer控制台消費資料
[root@elk-1 /data/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic --from-beginning
test1
test2
test3
test4
test5
test6
test7
test8
test9
test10


#删除一個topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --delete --zookeeper 192.168.81.210:2181  --topic testpic
           
ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

7.配置filebeat收集nginx、tomcat日志并存儲到kafka中

7.1.安裝并配置nginx服務

1.安裝nginx
[root@elk-3 ~]# yum -y install nginx

2.配置nginx日志格式
[root@elk-3 ~]# vim /etc/nginx/nginx.conf
http {
··············
    log_format  main '{"時間":"$time_iso8601",'
                       '"用戶端外網位址":"$http_x_forwarded_for",'
                       '"用戶端内網位址":"$remote_addr",'
                       '"狀态碼":$status,'
                       '"傳輸流量":$body_bytes_sent,'
                       '"跳轉來源":"$http_referer",'
                       '"URL":"$request",'
                       '"浏覽器":"$http_user_agent",'
                       '"請求響應時間":$request_time,'
                       '"後端位址":"$upstream_addr"}';

    access_log  /var/log/nginx/access.log  main;
··············
}

2.啟動nginx
[root@elk-3 ~]# systemctl start nginx
[root@elk-3 ~]# systemctl enable nginx

4.通路産生日志檢視效果
[root@elk-3 ~]# curl 127.0.0.1
[root@elk-3 ~]# tail /var/log/nginx/access.log 
{"時間":"2021-07-12T11:29:33+08:00","用戶端外網位址":"-","用戶端内網位址":"127.0.0.1","狀态碼":200,"傳輸流量":4833,"跳轉來源":"-","URL":"GET / HTTP/1.1","浏覽器":"curl/7.29.0","請求響應時間":0.000,"後端位址":"-"}
           

7.2.安裝tomcat服務

[root@elk-3 ~]# tar xf apache-tomcat-8.5.12.tar.gz -C /data/
[root@elk-3 ~]# mv /data/apache-tomcat-8.5.12/ /data/tomcat
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 
Using CATALINA_BASE:   /data/tomcat
Using CATALINA_HOME:   /data/tomcat
Using CATALINA_TMPDIR: /data/tomcat/temp
Using JRE_HOME:        /usr
Using CLASSPATH:       /data/tomcat/bin/bootstrap.jar:/data/tomcat/bin/tomcat-juli.jar
Tomcat started.
           

7.3.安裝filebeat服務

[root@elk-3 ~]# rpm -ivh filebeat-7.6.0-x86_64.rpm ```
           

7.4.配置filebeat收集應用日志并存儲到kafka

1.配置filebeat
[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log                                      #類型為log
  enabled: true
  paths:                                        #指定日志所在的路徑
    - /var/log/nginx/access.log
  json.keys_under_root: true                    #支援json格式的日志輸出
  json.overwriite_keys: true
  fields:                                       #在日志中增加一個字段,字段為log_topic,值為nginx_access,logstash根據帶有這個字段的日志存儲到指定的es索引庫
    log_topic: nginx-access
  tail_files: true                              #開啟日志監控,從日志的最後一行開始收集

- type: log
  enabled: true
  paths:
    - /data/tomcat/logs/catalina.out
  multiline.pattern: '^20'                      #收集tomcat錯誤日志,從第一個20到下一個20之間的日志整合在一行中顯示
  multiline.negate: true
  multiline.match: after
  fields:
    log_topic: tomcat-cata
  tail_files: true

output.kafka:                                   #輸出到kafka系統
  enabled: true
  hosts: ["192.168.81.210:9092","192.168.81.220:9092","192.168.81.230:9092"]                           #kafka的位址
  topic: '%{[fields][log_topic]}'               #指定将日志存儲到kafka叢集的哪個topic中,這裡的topic值是引用在inputs中定義的fields,通過這種方式可以将不同路徑的日志分别存儲到不同的topic中
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

2.啟動filebeat
[root@elk-3 ~]# systemctl start filebeat
[root@elk-3 ~]# systemctl enable filebeat
           

7.5.産生程式日志資料觀察資料是否存儲kafka

1.産生程式日志

1.産生nginx日志
[root@elk-3 ~]# ab -n 1000 -c 100 http://127.0.0.1/index.html

2.産生tomcat日志
[root@elk-3 ~]# /data/tomcat/bin/shutdown.sh
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 
           

2.觀察kafka中是否建立對應的topic

[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
__consumer_offsets
nginx-access
testpic
tomcat-cata

#nginx-access以及tomcat-cata的topic已經建立成功
           

3.觀察kafka日志的輸出

[root@elk-1 /data/kafka]# tail -f logs/kafkaServer.out
           
ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

8.配置logstash從kafka中讀取資料并存儲到es叢集

部署logstash,配置logstash從kafka中讀取topic資料并存儲到es叢集

8.1.部署logstash服務

1.安裝logstash
[root@elk-3 ~]# rpm -ivh logstash-7.6.0.rpm
           

8.2.配置logstash從kafka讀取資料存儲到es叢集

[root@elk-3 ~]# cat /etc/logstash/conf.d/in_kafka_to_es.conf 
#從kafka中讀取日志資料
input {				#資料源端
	kafka {				#類型為kafka
		bootstrap_servers => ["192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092"]			#kafka叢集位址
		topics => ["nginx-access","tomcat-cata"]			#要讀取那些kafka topics
		codec => "json"										#處理json格式的資料
		auto_offset_reset => "latest"						#隻消費最新的kafka資料
	}
}

#處理資料,去掉沒用的字段
filter {
	if[fields][log_topic] == "nginx-access" {			#如果log_topic字段為nginx-access則進行以下資料處理
	    json {					#json格式資料處理
	         source => "message"			#source等于message的
	         remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"]			#删除指定的字段
	    }
	    mutate {			#修改資料
	         remove_field => ["_index","_id","_type","_version","_score","referer","agent"]			#删除沒用的字段
	    }
	}
	
	if[fields][log_topic] == "tomcat-cata" {		#如果log_topic字段為tomcat-cata
	    grok {				#解析格式
		 match => {
		      "message" => "(?<時間>20[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) \[(?<線程名稱>[^\s]{0,})\] (?<日志等級>\w+) (?<類名稱>[^\s]{0,}) (?<日志詳情>[\W\w]+)" 			#将message的值增加上一些格式
		 }
	    }
	    mutate {			#修改資料
                 remove_field => ["_index","_id","_type","_version","_score","referer","agent"]			#删除沒用的字段
            }  
	}
}

#資料處理後存儲es叢集
output {				#目标端
	if[fields][log_topic] == "nginx-access" {			#如果log_topic的字段值為nginx-access就存到下面的es叢集裡
	    elasticsearch {						
		action => "index"			#類型為索引
		hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]		#es叢集位址
		index => "nginx-access-%{+YYYY.MM.dd}"			#存儲到es叢集的哪個索引裡
		codec => "json"						#處理json格式的解析
	    } 
	}
	
	if[fields][log_topic] == "tomcat-cata" {				#如果log_topic的字段值為tomcat-cata就存到下面的es叢集裡
	    elasticsearch {
		action => "index"				#類型為索引
		hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]			#es叢集位址
		index => "tomcat-cata-%{+YYYY.MM.dd}"			#存儲到es叢集的哪個索引裡
		codec => "json"						#處理json格式的解析
	    } 
	}	    
}
           

8.3.啟動logstash并觀察日志

[root@elk-3 ~]# nphup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf &
           

觀察日志的輸出,已經從nginx-access、tomcat-cata topic中讀取了資料并存到了es叢集中

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

8.4.檢視elasticsearch叢集是否增加了對應的索引庫

es叢集已經生成了tomcat-cata以及nginx-access索引庫

到此為止logstash已經成功從kafka叢集讀取到日志資料,然後傳入到elasticsearch叢集不同的索引庫

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

9.在kibana上關聯elasticsearch索引庫浏覽日志資料

9.1.在kibana上添加nginx-access索引模式

1)點選建立索引

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

2)填寫索引名

采用通配符的方式,填寫完點選下一步完成建立即可

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

3)添加一個時間篩選字段

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

4)建立成功

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

9.2.同樣方法添加tomcat-cata索引模式

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

9.3.查詢nginx-access索引日志資料

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

9.4.檢視tomcat-cata索引日志資料

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

10.報錯合集

10.1.es啟動時報錯無法指定被請求的位址

報錯内容如下

解決方法:仔細檢查配置檔案,肯定是某個位址配置錯了,我的就是監聽位址的ip寫錯了

ELK+Kafka+Filebeat企業級日志收集系統 #yyds幹貨盤點#ELK+Kafka+Filebeat日志系統

10.2.filebeat寫入資料到kafka api版本報錯

繼續閱讀