原文位址:https://cloud.tencent.com/developer/article/1353068
Elasticsearch是目前主流的分布式大資料存儲和搜尋引擎,可以為使用者提供強大的全文字檢索能力,廣泛應用于日志檢索,全站搜尋等領域。Logstash作為Elasicsearch常用的實時資料采集引擎,可以采集來自不同資料源的資料,并對資料進行處理後輸出到多種輸出源,是Elastic Stack 的重要組成部分。本文從Logstash的工作原理,使用示例,部署方式及性能調優等方面入手,為大家提供一個快速入門Logstash的方式。文章最後也給出了一些深入了解Logstash的的連結,以友善大家根據需要詳細了解。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5COmhzYxYGZ0EGOwAjMiRjZjRDOkNDOiRzYiZjYhNGZ48CX3FmcvwVbvNmLn1WakV3bsNWcu4Wah12Lc9CX6MHc0RHaiojIsJye.png)
1 Logstash工作原理
1.1 處理過程
Logstash處理過程
如上圖,Logstash的資料處理過程主要包括:Inputs, Filters, Outputs 三部分, 另外在Inputs和Outputs中可以使用Codecs對資料格式進行處理。這四個部分均以插件形式存在,使用者通過定義pipeline配置檔案,設定需要使用的input,filter,output, codec插件,以實作特定的資料采集,資料處理,資料輸出等功能
- (1)Inputs:用于從資料源擷取資料,常見的插件如file, syslog, redis, beats 等[詳細參考]
- (2)Filters:用于處理資料如格式轉換,資料派生等,常見的插件如grok, mutate, drop, clone, geoip等[詳細參考]
- (3)Outputs:用于資料輸出,常見的插件如elastcisearch,file, graphite, statsd等[詳細參考]
- (4)Codecs:Codecs不是一個單獨的流程,而是在輸入和輸出等插件中用于資料轉換的子產品,用于對資料進行編碼處理,常見的插件如json,multiline[詳細參考]
可以點選每個子產品後面的詳細參考連結了解該子產品的插件清單及對應功能
1.2 執行模型:
- (1)每個Input啟動一個線程,從對應資料源擷取資料
-
(2)Input會将資料寫入一個隊列:預設為記憶體中的有界隊列(意外停止會導緻資料丢失)。為了防止數丢失Logstash提供了兩個特性:
Persistent Queues:通過磁盤上的queue來防止資料丢失
Dead Letter Queues:儲存無法處理的event(僅支援Elasticsearch作為輸出源)
- (3)Logstash會有多個pipeline worker, 每一個pipeline worker會從隊列中取一批資料,然後執行filter和output(worker數目及每次處理的資料量均由配置确定)
2 Logstash使用示例
2.1 Logstash Hello world
第一個示例Logstash将采用标準輸入和标準輸出作為input和output,并且不指定filter
- (1)下載下傳Logstash并解壓(需要預先安裝JDK8)
- (2)cd到Logstash的根目錄,并執行啟動指令如下:
cd logstash-6.4.0
bin/logstash -e 'input { stdin { } } output { stdout {} }'
- (3)此時Logstash已經啟動成功,-e表示在啟動時直接指定pipeline配置,當然也可以将該配置寫入一個配置檔案中,然後通過指定配置檔案來啟動
- (4)在控制台輸入:hello world,可以看到如下輸出:
{
"@version" => "1",
"host" => "localhost",
"@timestamp" => 2018-09-18T12:39:38.514Z,
"message" => "hello world"
}
Logstash會自動為資料添加@version, host, @timestamp等字段
在這個示例中Logstash從标準輸入中獲得資料,僅在資料中添加一些簡單字段後将其輸出到标準輸出。
2.2 日志采集
這個示例将采用Filebeat input插件(Elastic Stack中的輕量級資料采集程式)采集本地日志,然後将結果輸出到标準輸出
- (1)下載下傳示例使用的日志檔案[位址],解壓并将日志放在一個确定位置
- (2)安裝filebeat,配置并啟動[參考]
filebeat.yml配置如下(paths改為日志實際位置,不同版本beats配置可能略有變化,請根據情況調整)
filebeat.prospectors:
- input\_type: log
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
hosts: "localhost:5044"
啟動指令:
./filebeat -e -c filebeat.yml -d "publish"
- (3)配置logstash并啟動
1)建立first-pipeline.conf檔案内容如下(該檔案為pipeline配置檔案,用于指定input,filter, output等):
input {
beats {
port => "5044"
}
}
#filter {
#}
output {
stdout { codec => rubydebug }
}
codec => rubydebug用于美化輸出[參考]
2)驗證配置(注意指定配置檔案的路徑):
./bin/logstash -f first-pipeline.conf --config.test_and_exit
3)啟動指令:
./bin/logstash -f first-pipeline.conf --config.reload.automatic
--config.reload.automatic選項啟用動态重載配置功能
4)預期結果:
可以在Logstash的終端顯示中看到,日志檔案被讀取并處理為如下格式的多條資料
{
"@timestamp" => 2018-10-09T12:22:39.742Z,
"offset" => 24464,
"@version" => "1",
"input_type" => "log",
"beat" => {
"name" => "VM_136_9_centos",
"hostname" => "VM_136_9_centos",
"version" => "5.6.10"
},
"host" => "VM_136_9_centos",
"source" => "/data/home/michelmu/workspace/logstash-tutorial.log",
"message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
相對于示例2.1,該示例使用了filebeat input插件從日志中擷取一行記錄,這也是Elastic stack擷取日志資料最常見的一種方式。另外該示例還采用了rubydebug codec 對輸出的資料進行顯示美化。
2.3 日志格式處理
可以看到雖然示例2.2使用filebeat從日志中讀取資料,并将資料輸出到标準輸出,但是日志内容作為一個整體被存放在message字段中,這樣對後續存儲及查詢都極為不便。可以為該pipeline指定一個grok filter來對日志格式進行處理
- (1)在first-pipeline.conf中增加filter配置如下
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
- (2)到filebeat的根目錄下删除之前上報的資料曆史(以便重新上報資料),并重新開機filebeat
sudo rm data/registry
sudo ./filebeat -e -c filebeat.yml -d "publish"
- (3)由于之前啟動Logstash設定了自動更新配置,是以Logstash不需要重新啟動,這個時候可以擷取到的日志資料如下:
{
"request" => "/style2.css",
"agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"offset" => 24464,
"auth" => "-",
"ident" => "-",
"input_type" => "log",
"verb" => "GET",
"source" => "/data/home/michelmu/workspace/logstash-tutorial.log",
"message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
"@timestamp" => 2018-10-09T12:24:21.276Z,
"response" => "200",
"bytes" => "4877",
"clientip" => "86.1.76.62",
"@version" => "1",
"beat" => {
"name" => "VM_136_9_centos",
"hostname" => "VM_136_9_centos",
"version" => "5.6.10"
},
"host" => "VM_136_9_centos",
"httpversion" => "1.1",
"timestamp" => "04/Jan/2015:05:30:37 +0000"
}
可以看到message中的資料被詳細解析出來了
2.4 資料派生和增強
Logstash中的一些filter可以根據現有資料生成一些新的資料,如geoip可以根據ip生成經緯度資訊
- (1)在first-pipeline.conf中增加geoip配置如下
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
- (2)如2.3一樣清空filebeat曆史資料,并重新開機
- (3)當然Logstash仍然不需要重新開機,可以看到輸出變為如下:
{
"request" => "/style2.css",
"agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"geoip" => {
"timezone" => "Europe/London",
"ip" => "86.1.76.62",
"latitude" => 51.5333,
"continent_code" => "EU",
"city_name" => "Willesden",
"country_name" => "United Kingdom",
"country_code2" => "GB",
"country_code3" => "GB",
"region_name" => "Brent",
"location" => {
"lon" => -0.2333,
"lat" => 51.5333
},
"postal_code" => "NW10",
"region_code" => "BEN",
"longitude" => -0.2333
},
"offset" => 24464,
"auth" => "-",
"ident" => "-",
"input_type" => "log",
"verb" => "GET",
"source" => "/data/home/michelmu/workspace/logstash-tutorial.log",
"message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
"@timestamp" => 2018-10-09T12:37:46.686Z,
"response" => "200",
"bytes" => "4877",
"clientip" => "86.1.76.62",
"@version" => "1",
"beat" => {
"name" => "VM_136_9_centos",
"hostname" => "VM_136_9_centos",
"version" => "5.6.10"
},
"host" => "VM_136_9_centos",
"httpversion" => "1.1",
"timestamp" => "04/Jan/2015:05:30:37 +0000"
}
可以看到根據ip派生出了許多地理位置資訊資料
2.5 将資料導入Elasticsearch
Logstash作為Elastic stack的重要組成部分,其最常用的功能是将資料導入到Elasticssearch中。将Logstash中的資料導入到Elasticsearch中操作也非常的友善,隻需要在pipeline配置檔案中增加Elasticsearch的output即可。
- (1)首先要有一個已經部署好的Logstash,當然可以使用騰訊雲快速建立一個Elasticsearch建立位址
- (2)在first-pipeline.conf中增加Elasticsearch的配置,如下
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
- (3)清理filebeat曆史資料,并重新開機
- (4)查詢Elasticsearch确認資料是否正常上傳(注意替換查詢語句中的日期)
curl -XGET 'http://172.16.16.17:9200/logstash-2018.10.09/_search?pretty&q=response=200'
- (5)如果Elasticsearch關聯了Kibana也可以使用kibana檢視資料是否正常上報
kibana圖示
Logstash提供了大量的Input, filter, output, codec的插件,使用者可以根據自己的需要,使用一個或多個元件實作自己的功能,當然使用者也可以自定義插件以實作更為定制化的功能。自定義插件可以參考[logstash input插件開發]
3 部署Logstash
示範過如何快速使用Logstash後,現在詳細講述一下Logstash的部署方式。
3.1 安裝
- 安裝JDK:Logstash采用JRuby編寫,運作需要JDK環境,是以安裝Logstash前需要先安裝JDK。(目前6.4僅支援JDK8)
- 安裝Logstash:可以采用直接下載下傳壓縮包方式安裝,也通過APT或YUM安裝,另外Logstash支援安裝到Docker中。[Logstash安裝參考]
- 安裝X-PACK:在6.3及之後版本X-PACK會随Logstash安裝,在此之前需要手動安裝[參考連結]
3.2 目錄結構
logstash的目錄主要包括:根目錄、bin目錄、配置目錄、日志目錄、插件目錄、資料目錄
不同安裝方式各目錄的預設位置參考[此處]
3.3 配置檔案
-
Pipeline配置檔案,名稱可以自定義,在啟動Logstash時顯式指定,編寫方式可以參考前面示例,對于具體插件的配置方式參見具體插件的說明(使用Logstash時必須配置):
用于定義一個pipeline,資料處理方式和輸出源
-
Settings配置檔案(可以使用預設配置):
在使用Logstash時可以不用設定,用于性能調優,日志記錄等
- logstash.yml:用于控制logstash的執行過程[參考連結]
- pipelines.yml: 如果有多個pipeline時使用該配置來配置多pipeline執行[參考連結]
- jvm.options:jvm的配置
- log4j2.properties:log4j 2的配置,用于記錄logstash運作日志[參考連結]
- startup.options: 僅适用于Lniux系統,用于設定系統啟動項目!
- 為了保證敏感配置的安全性,logstash提供了配置加密功能[參考連結]
3.4 啟動關閉方式
3.4.1 啟動
- 指令行啟動
- 在debian和rpm上以服務形式啟動
- 在docker中啟動3.4.2 關閉
- 關閉Logstash
- Logstash的關閉時會先關閉input停止輸入,然後處理完所有進行中的事件,然後才完全停止,以防止資料丢失,但這也導緻停止過程出現延遲或失敗的情況。
3.5 擴充Logstash
當單個Logstash無法滿足性能需求時,可以采用橫向擴充的方式來提高Logstash的處理能力。橫向擴充的多個Logstash互相獨立,采用相同的pipeline配置,另外可以在這多個Logstash前增加一個LoadBalance,以實作多個Logstash的負載均衡。
4 性能調優
[詳細調優參考]
- (1)Inputs和Outputs的性能:當輸入輸出源的性能已經達到上限,那麼性能瓶頸不在Logstash,應優先對輸入輸出源的性能進行調優。
- (2)系統性能名額:
- CPU:确定CPU使用率是否過高,如果CPU過高則先檢視JVM堆空間使用率部分,确認是否為GC頻繁導緻,如果GC正常,則可以通過調節Logstash worker相關配置來解決。
- 記憶體:由于Logstash運作在JVM上,是以注意調整JVM堆空間上限,以便其有足夠的運作空間。另外注意Logstash所在機器上是否有其他應用占用了大量記憶體,導緻Logstash記憶體磁盤交換頻繁。
-
I/O使用率:
1)磁盤IO:
磁盤IO飽和可能是因為使用了會導緻磁盤IO飽和的建立(如file output),另外Logstash中出現錯誤産生大量錯誤日志時也會導緻磁盤IO飽和。Linux下可以通過iostat, dstat等檢視磁盤IO情況
2)網絡IO:
網絡IO飽和一般發生在使用有大量網絡操作的插件時。linux下可以使用dstat或iftop等檢視網絡IO情況
- (3)JVM堆檢查:
- 如果JVM堆大小設定過小會導緻GC頻繁,進而導緻CPU使用率過高
- 快速驗證這個問題的方法是double堆大小,看性能是否有提升。注意要給系統至少預留1GB的空間。
- 為了精确查找問題可以使用jmap或VisualVM。[參考]
- 設定Xms和Xmx為相同值,防止堆大小在運作時調整,這個過程非常消耗性能。
-
(4)Logstash worker設定:
worker相關配置在logstash.yml中,主要包括如下三個:
- pipeline.workers:
該參數用以指定Logstash中執行filter和output的線程數,當如果發現CPU使用率尚未達到上限,可以通過調整該參數,為Logstash提供更高的性能。建議将Worker數設定适當超過CPU核數可以減少IO等待時間對處理過程的影響。實際調優中可以先通過-w指定該參數,當确定好數值後再寫入配置檔案中。
- pipeline.batch.size:
該名額用于指定單個worker線程一次性執行flilter和output的event批量數。增大該值可以減少IO次數,提高處理速度,但是也以為這增加記憶體等資源的消耗。當與Elasticsearch聯用時,該值可以用于指定Elasticsearch一次bluck操作的大小。
- pipeline.batch.delay:
該名額用于指定worker等待時間的逾時時間,如果worker在該時間内沒有等到pipeline.batch.size個事件,那麼将直接開始執行filter和output而不再等待。
結束語
Logstash作為Elastic Stack的重要組成部分,在Elasticsearch資料采集和處理過程中扮演着重要的角色。本文通過簡單示例的示範和Logstash基礎知識的鋪陳,希望可以幫助初次接觸Logstash的使用者對Logstash有一個整體認識,并能較為快速上手。對于Logstash的高階使用,仍需要使用者在使用過程中結合實際情況查閱相關資源深入研究。當然也歡迎大家積極交流,并對文中的錯誤提出寶貴意見。
MORE:
- Logstash資料處理常見示例
- Logstash日志相關配置參考
- Kibana管理Logstash pipeline配置
- LogstashModule
- 監控Logstash