說明
版本:5.6.1
見官網
對比flume
- 不重複消費,資料不丢失
- 目前flume支援hdfs比較好(個人了解)
離線安裝
先配置JAVA_HOME
下載下傳解壓即可
标準輸入輸出
檔案到标準輸出
#啟動
bin/logstash -f conf/file-stdout.conf
#說明:conf是自創目錄,file-stdout.conf内容:
input {
file {
path => "/home/bingo/data/test.log"
start_position => "beginning"
ignore_older =>
}
}
output {
stdout{}
}
#多檔案
path => "/home/bingo/data/*.log"
#多目錄
path => "/home/bingo/data/*/*.log"
#參數說明
start_position:預設end,是從檔案末尾開始解析
ignore_older:預設超過小時的日志不解析,表示不忽略任何過期日志
執行指令後會看到控制台輸出log檔案的内容
退出
ctrl+c
再次啟動,可以看到剛才列印的内容不再列印,即不重複消費
在另一個session連接配接終端往test.log追加内容
echo "updating" > test.log
可以看到追加的内容輸出到控制台
消息前面帶有時間和主機名,如: 2017-09-23T00:56:36.670Z node02 updating
上遊到elasticsearch
bin/logstash -f conf/flow-es.conf
#flow-es.conf内容
input {
file {
type => "flow"
path => "/home/bingo/data/logstash/logs/*/*.txt"
discover_interval =>
start_position => "beginning"
}
}
output {
if [type] == "flow" {
elasticsearch {
index => "flow-%{+YYYY.MM.dd}"
hosts => ["master01:9200", "worker01:9200", "worker02:9200"]
}
}
}
上遊到kafka
#控制台源
bin/logstash -e 'input { stdin {} } output { kafka { topic_id => "test" bootstrap_servers => "node01:9092,node02:9092,node03:9092"} }'
#輸入
spark <enter>
#檔案源
bin/logstash -f conf/flow-kafka.conf
#flow-kafka.conf内容
input {
file {
path => "/home/bingo/data/logstash/logs/*/*.txt"
discover_interval =>
start_position => "beginning"
}
}
output {
kafka {
topic_id => "testlog"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "node01:9092,node02:9092,node03:9092"
}
}
kafka端消費資料
bin/kafka-console-consumer.sh --zookeeper localhost: --topic test --from-beginning
#輸出
{"message":"spark","@version":"1","@timestamp":"2017-03-25T18:55:47.814Z","host":"worker01"}
#如果啟動多個消費端,每個消費端都會有相同的輸出
kafka到elasticsearch
bin/logstash -f conf/kafka-es.conf
#kafka-es.conf内容
input {
kafka {
type => "level-one"
auto_offset_reset => "smallest"
codec => plain {
charset => "UTF-8"
}
group_id => "es"
topic_id => "testlog"
zk_connect => "node01:2181,node02:2181,node03:2181"
}
}
filter {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_X" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => [ "message" ]
}
}
output {
elasticsearch {
index => "testlog-%{+YYYY.MM.dd}"
codec => plain {
charset => "UTF-8"
}
hosts => ["node01:9200", "node02:9200", "node03:9200"]
}
}
。。。更新中。。。