天天看點

logstash安裝與使用

說明

版本:5.6.1

見官網

對比flume

  1. 不重複消費,資料不丢失
  2. 目前flume支援hdfs比較好(個人了解)

離線安裝

先配置JAVA_HOME

下載下傳解壓即可

标準輸入輸出

檔案到标準輸出

#啟動
bin/logstash -f conf/file-stdout.conf 

#說明:conf是自創目錄,file-stdout.conf内容:
input {
    file {
        path => "/home/bingo/data/test.log"
        start_position => "beginning"
        ignore_older => 
    }
}
output {
    stdout{}
}


#多檔案
path => "/home/bingo/data/*.log"
#多目錄
path => "/home/bingo/data/*/*.log"
#參數說明
start_position:預設end,是從檔案末尾開始解析
ignore_older:預設超過小時的日志不解析,表示不忽略任何過期日志
           

執行指令後會看到控制台輸出log檔案的内容

退出

ctrl+c

再次啟動,可以看到剛才列印的内容不再列印,即不重複消費

在另一個session連接配接終端往test.log追加内容

echo "updating" > test.log 
           

可以看到追加的内容輸出到控制台

消息前面帶有時間和主機名,如: 2017-09-23T00:56:36.670Z node02 updating

上遊到elasticsearch

bin/logstash -f conf/flow-es.conf

#flow-es.conf内容
input {
  file {
    type => "flow"
    path => "/home/bingo/data/logstash/logs/*/*.txt"
    discover_interval => 
    start_position => "beginning" 
  }
}

output {
  if [type] == "flow" {
    elasticsearch {
      index => "flow-%{+YYYY.MM.dd}"
      hosts => ["master01:9200", "worker01:9200", "worker02:9200"]
    }
  }  
}
           

上遊到kafka

#控制台源
bin/logstash -e 'input { stdin {} } output { kafka { topic_id => "test" bootstrap_servers => "node01:9092,node02:9092,node03:9092"} }'

#輸入
spark <enter>

#檔案源
bin/logstash -f conf/flow-kafka.conf
#flow-kafka.conf内容
input {
  file {
    path => "/home/bingo/data/logstash/logs/*/*.txt"
    discover_interval => 
    start_position => "beginning" 
  }
}

output {
    kafka {
      topic_id => "testlog"
      codec => plain {
        format => "%{message}"
        charset => "UTF-8"
      }
      bootstrap_servers => "node01:9092,node02:9092,node03:9092"
    }
}
           

kafka端消費資料

bin/kafka-console-consumer.sh --zookeeper localhost: --topic test --from-beginning

#輸出
{"message":"spark","@version":"1","@timestamp":"2017-03-25T18:55:47.814Z","host":"worker01"}

#如果啟動多個消費端,每個消費端都會有相同的輸出
           

kafka到elasticsearch

bin/logstash -f conf/kafka-es.conf
#kafka-es.conf内容
input {
  kafka {
    type => "level-one"
    auto_offset_reset => "smallest"
    codec => plain {
      charset => "UTF-8"
    }
    group_id => "es"
    topic_id => "testlog"
    zk_connect => "node01:2181,node02:2181,node03:2181"
  }
}

filter {
  mutate {
    split => { "message" => "   " }
      add_field => {
        "event_type" => "%{message[3]}"
        "current_map" => "%{message[4]}"
        "current_X" => "%{message[5]}"
        "current_y" => "%{message[6]}"
        "user" => "%{message[7]}"
        "item" => "%{message[8]}"
        "item_id" => "%{message[9]}"
        "current_time" => "%{message[12]}"
     }
     remove_field => [ "message" ]
  } 
}

output {
    elasticsearch {
    index => "testlog-%{+YYYY.MM.dd}"
    codec => plain {
        charset => "UTF-8"
    }
    hosts => ["node01:9200", "node02:9200", "node03:9200"]
    } 
}
           

。。。更新中。。。

繼續閱讀