天天看點

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本
Mysql到Elasticsearch的資料同步,一般用ETL來實作,但性能并不理想,目前大部分的ETL是定時查詢Mysql資料庫有沒有新增資料或者修改資料,如果資料量小影響不大,但如果幾百萬上千萬的資料量性能就明顯的下降很多,本文是使用Go實作的go-mysql-transfer中間件來實時監控Mysql的Binlog日志,然後同步到Elasticsearch,從實時性、性能效果都不錯。

一、go-mysql-transfer

go-mysql-transfer是使用Go語言實作的MySQL資料庫實時增量同步工具。能夠實時監聽MySQL二進制日志(binlog)的變動,将變更内容形成指定格式的消息,發送到接收端。在資料庫和接收端之間形成一個高性能、低延遲的增量資料(Binlog)同步管道, 具有如下特點:

1、不依賴其它元件,一鍵部署

2、內建多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫用戶端,開箱即用

3、内置豐富的資料解析、消息生成規則;支援Lua腳本,以處理更複雜的資料邏輯

4、支援監控告警,內建Prometheus用戶端

5、高可用叢集部署

6、資料同步失敗重試

7、全量資料初始化

詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實作詳解

項目開源位址:go-mysql-transfer

二、配置

# app.ymltarget: elasticsearch #目标類型#elasticsearch連接配接配置es_addrs: 127.0.0.1:9200 #連接配接位址,多個用逗号分隔es_version: 7 # Elasticsearch版本,支援6和7、預設為7#es_password:  # 使用者名#es_version:  # 密碼
           

三、資料轉換規則

相關配置如下:

rule:  -    schema: eseap #資料庫名稱    table: t_user #表名稱    #order_by_column: id #排序字段,存量資料同步時不能為空    #column_lower_case: true #列名稱轉為小寫,預設為false    #column_upper_case:false#列名稱轉為大寫,預設為false    column_underscore_to_camel: true #列名稱下劃線轉駝峰,預設為false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  為空時表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  預設為空    #default_column_values: area_name=合肥  #預設的列-值,多個用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫預設yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫預設yyyy-MM-dd HH:mm:ss    #Elasticsearch相關    es_index: user_index #Index名稱,可以為空,預設使用表(Table)名稱    #es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導    #  -      #   column: REMARK #資料庫列名稱    #    field: remark #映射後的ES字段名稱    #    type: text #ES字段類型    #    analyzer: ik_smart #ES分詞器,type為text此項有意義    #    #format: #日期格式,type為date此項有意義    #   -      #    column: USER_NAME #資料庫列名稱    #    field: account #映射後的ES字段名稱    #    type: keyword #ES字段類型
           

示例一

t_user表,資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

自動建立的Mapping,如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

同步到Elasticsearch的資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

示例二

t_user表,同執行個體一

使用如下配置:

rule:  -    schema: eseap #資料庫名稱    table: t_user #表名稱    order_by_column: id #排序字段,存量資料同步時不能為空    column_lower_case: true #列名稱轉為小寫,預設為false    #column_upper_case:false#列名稱轉為大寫,預設為false    #column_underscore_to_camel: true #列名稱下劃線轉駝峰,預設為false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  為空時表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  預設為空    default_column_values: area_name=合肥  #預設的列-值,多個用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫預設yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫預設yyyy-MM-dd HH:mm:ss    #Elasticsearch相關    es_index: user_index #Index名稱,可以為空,預設使用表(Table)名稱    es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導      -          column: REMARK #資料庫列名稱        field: remark #映射後的ES字段名稱        type: text #ES字段類型        analyzer: ik_smart #ES分詞器,type為text此項有意義        #format: #日期格式,type為date此項有意義      -          column: USER_NAME #資料庫列名稱        field: account #映射後的ES字段名稱        type: keyword #ES字段類型
           

es_mappings 定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動建立索引的mappings(映射關系)。

自動建立的Mapping,如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

同步到Elasticsearch的資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

四、Lua腳本

使用Lua腳本可以實作更複雜的資料處理邏輯,go-mysql-transfer支援Lua5.1文法。

示例一

t_user表,資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

引入Lua腳本:

#規則配置  rule:  -    schema: eseap #資料庫名稱    table: t_user #表名稱    order_by_column: id #排序字段,存量資料同步時不能為空    lua_file_path: lua/t_user_es.lua   #lua腳本檔案    es_index: user_index #Elasticsearch Index名稱,可以為空,預設使用表(Table)名稱    es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導      -          field: id #映射後的ES字段名稱        type: keyword #ES字段類型      -          field: userName #映射後的ES字段名稱        type: keyword #ES字段類型      -          field: password #映射後的ES字段名稱        type: keyword #ES字段類型      -          field: createTime #映射後的ES字段名稱        type: date #ES字段類型        format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義      -          field: remark #映射後的ES字段名稱        type: text #ES字段類型        analyzer: ik_smart #ES分詞器,type為text此項有意義      -          field: source #映射後的ES字段名稱        type: keyword #ES字段類型
           

es_mappings 定義索引的mappings(映射關系),不定義es_mappings則根據字段的值自動建立mappings(映射關系)。根據es_mappings 生成的mappings如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

user_index索引mappings

Lua腳本:

local ops = require("esOps") --加載elasticsearch操作子產品local row = ops.rawRow()  --目前資料庫的一行資料,table類型,key為列名稱local action = ops.rawAction()  --目前資料庫事件,包括:insert、update、deletelocal id = row["ID"] --擷取ID列的值local userName = row["USER_NAME"] --擷取USER_NAME列的值local password = row["PASSWORD"] --擷取USER_NAME列的值local createTime = row["CREATE_TIME"] --擷取CREATE_TIME列的值local remark = row["REMARK"] --擷取REMARK列的值local result = {}  -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 資料來源if action == "insert" then -- 隻監聽新增事件    ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的資料主鍵;參數3為要插入的資料,tablele類型或者json字元串end 
           

同步到Elasticsearch的資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

示例二

t_user表,同執行個體一

引入Lua腳本:

schema: eseap #資料庫名稱    table: t_user #表名稱    lua_file_path: lua/t_user_es2.lua   #lua腳本檔案
           

未明确定義index名稱、mappings,es會根據值自動建立一個名為t_user的index。

使用如下腳本:

local ops = require("esOps") --加載elasticsearch操作子產品local row = ops.rawRow()  --目前資料庫的一行資料,table類型,key為列名稱local action = ops.rawAction()  --目前資料庫事件,包括:insert、update、deletelocal id = row["ID"] --擷取ID列的值local userName = row["USER_NAME"] --擷取USER_NAME列的值local password = row["PASSWORD"] --擷取USER_NAME列的值local createTime = row["CREATE_TIME"] --擷取CREATE_TIME列的值local result = {}  -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 資料來源if action == "insert" then -- 隻監聽新增事件    ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的資料主鍵;參數3為要插入的資料,tablele類型或者json字元串end 
           

同步到Elasticsearch的資料如下:

elasticsearch date_MySQL資料實時增量同步到Elasticsearch一、go-mysql-transfer二、配置三、資料轉換規則四、Lua腳本

esOps子產品提供的方法如下:

  1. INSERT: 插入操作,如:ops.INSERT(index,id,result)。參數index為索引名稱,字元串類型;參數index為要插入資料的主鍵;參數result為要插入的資料,可以為table類型或者json字元串
  2. UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。參數index為索引名稱,字元串類型;參數index為要修改資料的主鍵;參數result為要修改的資料,可以為table類型或者json字元串
  3. DELETE: 删除操作,如:ops.DELETE(index,id)。參數index為索引名稱,字元串類型;參數id為要删除的資料主鍵,類型不限;

文章來源:https://www.jianshu.com/p/5a9b6c4f318c

繼續閱讀