天天看點

ELK 建構 MySQL 慢日志收集平台詳解

ELK 介紹

ELK 最早是 Elasticsearch(以下簡稱ES)、Logstash、Kibana 三款開源軟體的簡稱,三款軟體後來被同一公司收購,并加入了Xpark、Beats等元件,改名為Elastic Stack,成為現在最流行的開源日志解決方案,雖然有了新名字但大家依然喜歡叫她ELK,現在所說的ELK就指的是基于這些開源軟體建構的日志系統。

我們收集mysql慢日志的方案如下:

ELK 建構 MySQL 慢日志收集平台詳解
  • mysql 伺服器安裝 Filebeat 作為 agent 收集 slowLog
  • Filebeat 讀取 mysql 慢日志檔案做簡單過濾傳給 Kafka 叢集
  • Logstash 讀取 Kafka 叢集資料并按字段拆分後轉成 JSON 格式存入 ES 叢集
  • Kibana讀取ES叢集資料展示到web頁面上

慢日志分類

目前主要使用的mysql版本有5.5、5.6 和 5.7,經過仔細對比發現每個版本的慢查詢日志都稍有不同,如下:

5.5 版本慢查詢日志

ELK 建構 MySQL 慢日志收集平台詳解

5.6 版本慢查詢日志

ELK 建構 MySQL 慢日志收集平台詳解

5.7 版本慢查詢日志

ELK 建構 MySQL 慢日志收集平台詳解

慢查詢日志異同點:

  1. 每個版本的Time字段格式都不一樣
  2. 相較于5.6、5.7版本,5.5版本少了Id字段
  3. use db語句不是每條慢日志都有的
  4. 可能會出現像下邊這樣的情況,慢查詢塊# Time:下可能跟了多個慢查詢語句
ELK 建構 MySQL 慢日志收集平台詳解

處理思路

上邊我們已經分析了各個版本慢查詢語句的構成,接下來我們就要開始收集這些資料了,究竟應該怎麼收集呢?

  1. 拼裝日志行:mysql 的慢查詢日志多行構成了一條完整的日志,日志收集時要把這些行拼裝成一條日志傳輸與存儲。
  2. Time行處理:# Time: 開頭的行可能不存在,且我們可以通過SET timestamp這個值來确定SQL執行時間,是以選擇過濾丢棄Time行
  3. 一條完整的日志:最終将以# User@Host: 開始的行,和以SQL語句結尾的行合并為一條完整的慢日志語句
  4. 确定SQL對應的DB:use db這一行不是所有慢日志SQL都存在的,是以不能通過這個來确定SQL對應的DB,慢日志中也沒有字段記錄DB,是以這裡建議為DB建立賬号時添加db name辨別,例如我們的賬号命名方式為:projectName_dbName,這樣看到賬号名就知道是哪個DB了
  5. 确定SQL對應的主機:我想通過日志知道這條SQL對應的是哪台資料庫伺服器怎麼辦?

    慢日志中同樣沒有字段記錄主機,可以通過filebeat注入字段來解決,例如我們給filebeat的name字段設定為伺服器IP,這樣最終通過beat.name這個字段就可以确定SQL對應的主機了。

Filebeat配置

filebeat 完整的配置檔案如下:

ELK 建構 MySQL 慢日志收集平台詳解
# mysql_slow.log
-
  input_type: log
  paths:
    - /home/logs/mysql/mysqld_slow.log
  document_type: mysqld-slow
  exclude_lines: ['^\# Time']
  
  multiline.pattern: '^\# Time|^\# User'
  multiline.negate: true
  multiline.match: after
  
  tail_files: true
      

重要參數解釋:

  • input_type:指定輸入的類型是log或者是stdin
  • paths:慢日志路徑,支援正則,比如/data/*.log
  • exclude_lines:過濾掉# Time開頭的行
  • multiline.pattern:比對多行時指定正規表達式,這裡比對以# Time或者# User開頭的行,Time行要先比對再過濾
  • multiline.negate:定義上邊pattern比對到的行是否用于多行合并,也就是定義是不是作為日志的一部分
  • multiline.match:定義如何将皮排行組合成時間,在之前或者之後
  • tail_files:定義是從檔案開頭讀取日志還是結尾,這裡定義為true,從現在開始收集,之前已存在的不管
  • name:設定filebeat的名字,如果為空則為伺服器的主機名,這裡我們定義為伺服器IP
  • output.kafka:配置要接收日志的kafka叢集位址可topic名稱

Kafka 接收到的日志格式:

ELK 建構 MySQL 慢日志收集平台詳解
{"@timestamp":"2018-08-07T09:36:00.140Z","beat":{"hostname":"db-7eb166d3","name":"10.63.144.71","version":"5.4.0"},"input_type":"log","message":"# User@Host: select[select] @  [10.63.144.16]  Id: 23460596\n# Query_time: 0.155956  Lock_time: 0.000079 Rows_sent: 112  Rows_examined: 366458\nSET timestamp=1533634557;\nSELECT DISTINCT(uid) FROM common_member WHERE hideforum=-1 AND uid != 0;","offset":1753219021,"source":"/data/slow/mysql_slow.log","type":"log"}
      

  

Logstash配置

logstash完整的配置檔案如下:

ELK 建構 MySQL 慢日志收集平台詳解

僅顯示filter資訊

if [type] =~ "mysqld-slow" {
        mutate {
            add_field => {"line_message" => "%{message} %{offset}"}
        }
        ruby {
            code => "
            require 'digest/md5';
            event.set('computed_id', Digest::MD5.hexdigest(event.get('line_message')))
            "
         }
	#有ID有use
        grok {
            match => { 
		"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:
int}\n\#\s+Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined: 
%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+)\;\s+SET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)" 
		}
        }
	#有ID無use
        grok {
            match => {
		 "message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id
:int}\n\#\s+Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:
 %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
		}
        }
	#無ID有use
        grok {    				                                
            match => {
		 "message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\n\#\s+Query_time: %
{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:i
nt}\nuse\s(?<dbname>\w+)\;\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
		}
        }
	#無ID無use
        grok {
            match => { 
		"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\n\#\s+Query_time: %{
NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:in
t}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)" 
		}
        }
        date {
            match => ["timestamp_mysql", "UNIX"]
            target => "@timestamp"
        }
       mutate {
            remove_field => ["line_message","message","kafka","tags"]
       }
   }
      

  • input:配置 kafka 的叢集位址和 topic 名字
  • filter:過濾日志檔案,主要是對 message 資訊(看前文 kafka 接收到的日志格式)進行拆分,拆分成一個一個易讀的字段,例如User、Host、Query_time、Lock_time、timestamp等。

    grok段根據我們前文對mysql慢日志的分類分别寫不通的正規表達式去比對,當有多條正規表達式存在時,logstash會從上到下依次比對,比對到一條後邊的則不再比對。

    date字段定義了讓SQL中的timestamp_mysql字段作為這條日志的時間字段,kibana上看到的實踐排序的資料依賴的就是這個時間

  • output:配置ES伺服器叢集的位址和index,index自動按天分割

ES 中mysqld-slow-*索引模闆

{
  "order": 0,
  "template": "mysqld-slow-*",
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "mysqld-slow": {
      "numeric_detection": true,
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        },
        "@version": {
          "type": "string"
        },
        "query_time": {
          "type": "double"
        },
        "row_sent": {
          "type": "string"
        },
        "rows_examined": {
          "type": "string"
        },
        "clientip": {
          "type": "string"
        },
        "clienthost": {
          "type": "string"
        },
        "id": {
          "type": "integer"
        },
        "lock_time": {
          "type": "string"
        },
        "dbname": {
          "type": "keyword"
        },
        "user": {
          "type": "keyword"
        },
        "query": {
          "type": "string",
          "index": "not_analyzed"
        },
        "tags": {
          "type": "string"
        },
        "timestamp": {
          "type": "string"
        },
        "type": {
          "type": "string"
        }
      }
    }
  },
  "aliases": {}
}      

kibana查詢展示

  • 打開Kibana添加 

    mysql-slowlog-*

     的Index,并選擇timestamp,建立Index Pattern
  • ELK 建構 MySQL 慢日志收集平台詳解
     進入Discover頁面,可以很直覺的看到各個時間點慢日志的數量變化,可以根據左側Field實作簡單過濾,搜尋框也友善搜尋慢日志,例如我要找查詢時間大于2s的慢日志,直接在搜尋框輸入 

    query_time: > 2

     回車即可。
  • ELK 建構 MySQL 慢日志收集平台詳解
    點選每一條日志起邊的很色箭頭能檢視具體某一條日志的詳情。
  • ELK 建構 MySQL 慢日志收集平台詳解
    如果你想做個大盤統計慢日志的整體情況,例如top 10 SQL等,也可以很友善的通過web界面配置。
  • ELK 建構 MySQL 慢日志收集平台詳解
  • 總結

    1. 不要望而卻步,當你開始去做已經成功一半了