項目通過log4j2把日志寫到了Kafka中,為了進一步分析資料通過logstash取出kafka的資料,經過filter處理之後,存入到elasticsearch中。
log4j2寫入kafka主要是配置logj2.xml檔案,加入kafka的配置和日志輸出。
主題要配置正确,ip和端口号要配置kafka的,不是zookeeper的。Loggers要加入配好的AUDIT_KAFKA.
logstash使用的是6.2.3,首先是input輸入,logstash內建了kafka的插件,直接用就成了。配置kafka的bootstrap位址。格式用json格式。topic一定要配置正确。
然後是filter部分,因為取的審計日志,特征比較明顯,直接用split進行分割。
這裡用mutate插件,并且将分割好的資料,以自己需要的名稱來命名。還可以進行二次分割,再寫一個mutate就好。
最後是輸出部分,輸出到elasticsearch,也是直接寫elasticsearch就好。
就這樣分割完了。
之後需要用的資料,可以通過java端寫代碼來擷取。這部分還是之後再寫吧。
學習過程中找到一篇elk的資料,和elasticsearch header的windows版本的安裝。位址:https://download.csdn.net/download/qq_28600087/10820880