天天看點

logstash_output_kafka:Mysql同步Kafka深入詳解

0、題記

實際業務場景中,會遇到基礎資料存在Mysql中,實時寫入資料量比較大的情景。

遷移至kafka是一種比較好的業務選型方案。

logstash_output_kafka:Mysql同步Kafka深入詳解

而mysql寫入kafka的選型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他類似方案。

其中:debezium和flume是基于mysql binlog實作的。

如果需要同步曆史全量資料+實時更新資料,建議使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc實作關系型資料庫到Elasticsearch等的同步。

實際上,核心logstash的同步原理的掌握,有助于大家了解類似的各種庫之間的同步。

logstash核心原理:輸入生成事件,過濾器修改它們,輸出将它們發送到其他地方。

logstash核心三部分組成:input、filter、output。

logstash_output_kafka:Mysql同步Kafka深入詳解
input { }
filter { }
output { }           

1.1 input輸入

包含但遠不限于:

1.jdbc:關系型資料庫:mysql、oracle等。
 2.file:從檔案系統上的檔案讀取。
 3.syslog:在已知端口514上偵聽syslog消息。           

redis:redis消息。 beats:處理 Beats發送的事件。

kafka:kafka實時資料流。

1.2 filter過濾器

過濾器是Logstash管道中的中間處理裝置。您可以将過濾器與條件組合,以便在事件滿足特定條件時對其執行操作。

可以把它比作資料處理的ETL環節。

一些有用的過濾包括:

1.grok:解析并構造任意文本。Grok是目前Logstash中将非結構化日志資料解析為結構化和可查詢内容的最佳方式。有了内置于Logstash的120種模式,您很可能會找到滿足您需求的模式!
 2.mutate:對事件字段執行正常轉換。您可以重命名,删除,替換和修改事件中的字段。
 3.drop:完全删除事件,例如調試事件。
 4.clone:制作事件的副本,可能添加或删除字段。
 5.geoip:添加有關IP位址的地理位置的資訊。
           

1.3 output輸出

輸出是Logstash管道的最後階段。一些常用的輸出包括:

1.elasticsearch:将事件資料發送到Elasticsearch。
2.file:将事件資料寫入磁盤上的檔案。
3.kafka:将事件寫入Kafka。
           

詳細的filter demo參考:

https://github.com/hellosign/logstash-fundamentals/blob/master/examples/complex_logstash.md

2、logstash_output_kafka同步Mysql到kafka配置參考

input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
      jdbc_user => "root"
      jdbc_password => "xxxxxxx"
      jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #schedule => "* * * * *"
      statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"
      use_column_value => true
      tracking_column => "id"        
      tracking_column_type => "numeric"
      record_last_run => true
      last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"    

    }
 
}

filter {
   ruby{
        code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"
    }
    ruby{
        code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"
    }
  mutate {
    remove_field => [ "@version" ]
    remove_field => [ "@timestamp" ]
    remove_field => [ "gather_time" ]
    remove_field => [ "publish_time" ]
  }
}

 output {
      kafka {
            bootstrap_servers => "192.168.1.13:9092"
            codec => json_lines
            topic_id => "mytopic"

    }
    file {
            codec => json_lines
            path => "/tmp/output_a.log"
    }
 }
           

以上内容不複雜,不做細講。

注意:

Mysql借助logstash同步後,日期類型格式:“2019-04-20 13:55:53”已經被識别為日期格式。

code =>
“event.set(‘gather_time_unix’,event.get(‘gather_time’).to_i*1000)”,           

是将Mysql中的時間格式轉化為時間戳格式。

3、坑總結

3.1 坑1字段大小寫問題

from星友:使用logstash同步mysql資料的,因為在jdbc.conf裡面沒有添加 lowercase_column_names

=> “false” 這個屬性,是以logstash預設把查詢結果的列明改為了小寫,同步進了es,是以就導緻es裡面看到的字段名稱全是小寫。

最後總結:es是支援大寫字段名稱的,問題出在logstash沒用好,需要在同步配置中加上 lowercase_column_names => “false” 。記錄下來希望可以幫到更多人,哈哈。

3.2 同步到ES中的資料會不會重複?

想将關系資料庫的資料同步至ES中,如果在叢集的多台伺服器上同時啟動logstash。

解讀:實際項目中就是沒用随機id 使用指定id作為es的_id ,指定id可以是url的md5.這樣相同資料就會走更新覆寫以前資料

3.3 相同配置logstash,更新6.3之後不能同步資料。

解讀:高版本基于時間增量有優化。

tracking_column_type => "timestamp"

應該是需要指定辨別為時間類型,預設為數字類型numeric

3.4 ETL字段統一在哪處理?

解讀:可以logstash同步mysql的時候sql查詢階段處理,如:select a_value as avalue*。

或者filter階段處理,mutate rename處理。

mutate {
        rename => ["shortHostname", "hostname" ]
    }           

或者kafka階段借助kafka stream處理。

4、小結

mysql2mysql:

https://my.oschina.net/u/2601303/blog/1503835

推薦開源實作:

https://github.com/Lunatictwo/DataX

推薦閱讀:

1、實戰 | canal 實作Mysql到Elasticsearch實時增量同步

2、幹貨 | Debezium實作Mysql到Elasticsearch高效實時同步

3、一張圖理清楚關系型/非關系型資料庫與Elasticsearch同步

logstash_output_kafka:Mysql同步Kafka深入詳解

銘毅天下——Elasticsearch基礎、進階、實戰第一公衆号