天天看點

使用kafka連接配接器遷移mysql資料到ElasticSearch

概述

把 mysql 的資料遷移到 es 有很多方式,比如直接用 es 官方推薦的 logstash 工具,或者監聽 mysql 的 binlog 進行同步,可以結合一些開源的工具比如阿裡的 canal。

這裡打算詳細介紹另一個也是不錯的同步方案,這個方案基于 kafka 的連接配接器。流程可以概括為:

  1. mysql連接配接器監聽資料變更,把變更資料發送到 kafka topic。
  2. ES 監聽器監聽kafka topic 消費,寫入 ES。

Kafka Connect有兩個核心概念:Source和Sink。Source負責導入資料到Kafka,Sink負責從Kafka導出資料,它們都被稱為Connector,也就是連接配接器。在本例中,mysql的連接配接器是source,es的連接配接器是sink。

這些連接配接器本身已經開源,我們之間拿來用即可。不需要再造輪子。

過程詳解

準備連接配接器工具

我下面所有的操作都是在自己的mac上進行的。

首先我們準備兩個連接配接器,分别是

kafka-connect-elasticsearch

kafka-connect-elasticsearch

, 你可以通過源碼編譯他們生成jar包,源碼位址:

kafka-connect-elasticsearch

kafka-connect-mysql

我個人不是很推薦這種源碼的編譯方式,因為真的好麻煩。除非想研究源碼。

我是直接下載下傳 confluent 平台的工具包,裡面有編譯号的jar包可以直接拿來用,下載下傳位址:

confluent 工具包

我下載下傳的是 confluent-5.3.1 版本, 相關的jar包在 confluent-5.3.1/share/java 目錄下

我們把編譯好的或者下載下傳的jar包拷貝到kafka的libs目錄下。拷貝的時候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相關的依賴包也要一起拷貝過來,比如es這個jar包目錄下的http相關的,jersey相關的等,否則會報各種

java.lang.NoClassDefFoundError

的錯誤。

另外mysql-connector-java-5.1.22.jar也要放進去。

資料庫和ES環境準備

資料庫和es我都是在本地啟動的,這個過程具體就不說了,網上有很多參考的。

我建立了一個名為test的資料庫,裡面有一個名為login的表。

配置連接配接器

這部分是最關鍵的,我實際操作的時候這裡也是最耗時的。

首先配置jdbc的連接配接器。

我們從confluent工具包裡拷貝一個配置檔案的模闆(confluent-5.3.1/share目錄下),自帶的隻有sqllite的配置檔案,拷貝一份到kafka的config目錄下,改名為sink-quickstart-mysql.properties,檔案内容如下:

# tasks to create:
name=mysql-login-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111
mode=timestamp+incrementing
timestamp.column.name=login_time
incrementing.column.name=id
topic.prefix=mysql.
table.whitelist=login           

複制

connection.url指定要連接配接的資料庫,這個根據自己的情況修改。mode訓示我們想要如何查詢資料。在本例中我選擇incrementing遞增模式和timestamp 時間戳模式混合的模式, 并設定incrementing.column.name遞增列的列名和時間戳所在的列名。

** 混合模式還是比較推薦的,它能盡量的保證資料同步不丢失資料。**具體的原因大家可以查閱相關資料,這裡就不詳述了。

topic.prefix是衆多表名之前的topic的字首,table.whitelist是白名單,表示要監聽的表,可以使組合多個表。兩個組合在一起就是該表的變更topic,比如在這個示例中,最終的topic就是mysql.login。

connector.class是具體的連接配接器處理類,這個不用改。

其它的配置基本不用改。

接下來就是ES的配置了。同樣也是拷貝 quickstart-elasticsearch.properties 檔案到kafka的config目錄下,然後修改,我自己的環境内容如下:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql.login
key.ignore=true
connection.url=http://localhost:9200
type.name=mysqldata           

複制

topics的名字和上面mysql設定的要保持一緻,同時這個也是ES資料導入的索引。從裡也可以看出,ES的連接配接器一個執行個體隻能監聽一張表。

type.name需要關注下,我使用的ES版本是7.1,我們知道在7.x的版本中已經隻有一個固定的type(_doc)了,使用低版本的連接配接器在同步的時候會報錯誤,我這裡使用的5.3.1版本已經相容了。繼續看下面的章節就知道了。

關于es連接配接器和es的相容性問題,有興趣的可以看看下面這個issue:

https://github.com/confluentinc/kafka-connect-elasticsearch/issues/314

啟動測試

當然首先啟動zk和kafka。

然後我們啟動mysql的連接配接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &           

複制

接着手動往login表插入幾條記錄,正常情況下這些變更已經發到kafka對應的topic上去了。為了驗證,我們在控制台啟動一個消費者從mysql.login主題讀取資料:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning           

複制

可以看到剛才插入的資料。

把資料從 MySQL 移動到 Kafka 裡就算完成了,接下來把資料從 Kafka 寫到 ElasticSearch 裡。

首先啟動ES和kibana,當然後者不是必須的,隻是友善我們在IDE環境裡測試ES。你也可以通過控制台給ES發送HTTP的指令。

先把之前啟動的mysql連接配接器程序結束(因為會占用端口),再啟動 ES 連接配接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &           

複制

如果正常的話,ES這邊應該已經有資料了。打開kibana的開發工具,在console裡執行

GET _cat/indices           

複制

這是擷取節點上所有的索引,你應該能看到,

green open mysql.login                  1WqRjkbfTlmXj8eKBPvAtw 1 1      4    0    12kb   7.8kb           

複制

說明索引已經正常建立了。然後我們查詢下,

GET mysql.login/_search?pretty=true           

複制

結果如下,

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+0",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "username" : "lucas1",
          "login_time" : 1575870785000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+1",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "username" : "lucas2",
          "login_time" : 1575870813000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+2",
        "_score" : 1.0,
        "_source" : {
          "id" : 3,
          "username" : "lucas3",
          "login_time" : 1575874031000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+3",
        "_score" : 1.0,
        "_source" : {
          "id" : 4,
          "username" : "lucas4",
          "login_time" : 1575874757000
        }
      }
    ]
  }
}           

複制

參考:

1.《kafka權威指南》

  1. https://www.jianshu.com/p/46b6fa53cae4