Mysql到Elasticsearch的資料同步,一般用ETL來實作,但性能并不理想,目前大部分的ETL是定時查詢Mysql資料庫有沒有新增資料或者修改資料,如果資料量小影響不大,但如果幾百萬上千萬的資料量性能就明顯的下降很多,本文是使用Go實作的go-mysql-transfer中間件來實時監控Mysql的Binlog日志,然後同步到Elasticsearch,從實時性、性能效果都不錯。
一、go-mysql-transfer
go-mysql-transfer是使用Go語言實作的MySQL資料庫實時增量同步工具。能夠實時監聽MySQL二進制日志(binlog)的變動,将變更内容形成指定格式的消息,發送到接收端。在資料庫和接收端之間形成一個高性能、低延遲的增量資料(Binlog)同步管道, 具有如下特點:
1、不依賴其它元件,一鍵部署
2、內建多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫用戶端,開箱即用
3、内置豐富的資料解析、消息生成規則;支援Lua腳本,以處理更複雜的資料邏輯
4、支援監控告警,內建Prometheus用戶端
5、高可用叢集部署
6、資料同步失敗重試
7、全量資料初始化
詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實作詳解
項目開源位址:go-mysql-transfer
二、配置
# app.ymltarget: elasticsearch #目标類型#elasticsearch連接配接配置es_addrs: 127.0.0.1:9200 #連接配接位址,多個用逗号分隔es_version: 7 # Elasticsearch版本,支援6和7、預設為7#es_password: # 使用者名#es_version: # 密碼
三、資料轉換規則
相關配置如下:
rule: - schema: eseap #資料庫名稱 table: t_user #表名稱 #order_by_column: id #排序字段,存量資料同步時不能為空 #column_lower_case: true #列名稱轉為小寫,預設為false #column_upper_case:false#列名稱轉為大寫,預設為false column_underscore_to_camel: true #列名稱下劃線轉駝峰,預設為false # 包含的列,多值逗号分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id 預設為空 #default_column_values: area_name=合肥 #預設的列-值,多個用逗号分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫預設yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫預設yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,預設使用表(Table)名稱 #es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導 # - # column: REMARK #資料庫列名稱 # field: remark #映射後的ES字段名稱 # type: text #ES字段類型 # analyzer: ik_smart #ES分詞器,type為text此項有意義 # #format: #日期格式,type為date此項有意義 # - # column: USER_NAME #資料庫列名稱 # field: account #映射後的ES字段名稱 # type: keyword #ES字段類型
示例一
t_user表,資料如下:
自動建立的Mapping,如下:
同步到Elasticsearch的資料如下:
示例二
t_user表,同執行個體一
使用如下配置:
rule: - schema: eseap #資料庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量資料同步時不能為空 column_lower_case: true #列名稱轉為小寫,預設為false #column_upper_case:false#列名稱轉為大寫,預設為false #column_underscore_to_camel: true #列名稱下劃線轉駝峰,預設為false # 包含的列,多值逗号分隔,如:id,name,age,area_id 為空時表示包含全部列 #include_columns: ID,USER_NAME,PASSWORD #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id 預設為空 default_column_values: area_name=合肥 #預設的列-值,多個用逗号分隔,如:source=binlog,area_name=合肥 #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫預設yyyy-MM-dd #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫預設yyyy-MM-dd HH:mm:ss #Elasticsearch相關 es_index: user_index #Index名稱,可以為空,預設使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導 - column: REMARK #資料庫列名稱 field: remark #映射後的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 #format: #日期格式,type為date此項有意義 - column: USER_NAME #資料庫列名稱 field: account #映射後的ES字段名稱 type: keyword #ES字段類型
es_mappings 定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動建立索引的mappings(映射關系)。
自動建立的Mapping,如下:
同步到Elasticsearch的資料如下:
四、Lua腳本
使用Lua腳本可以實作更複雜的資料處理邏輯,go-mysql-transfer支援Lua5.1文法。
示例一
t_user表,資料如下:
引入Lua腳本:
#規則配置 rule: - schema: eseap #資料庫名稱 table: t_user #表名稱 order_by_column: id #排序字段,存量資料同步時不能為空 lua_file_path: lua/t_user_es.lua #lua腳本檔案 es_index: user_index #Elasticsearch Index名稱,可以為空,預設使用表(Table)名稱 es_mappings: #索引映射,可以為空,為空時根據資料類型自行推導ES推導 - field: id #映射後的ES字段名稱 type: keyword #ES字段類型 - field: userName #映射後的ES字段名稱 type: keyword #ES字段類型 - field: password #映射後的ES字段名稱 type: keyword #ES字段類型 - field: createTime #映射後的ES字段名稱 type: date #ES字段類型 format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義 - field: remark #映射後的ES字段名稱 type: text #ES字段類型 analyzer: ik_smart #ES分詞器,type為text此項有意義 - field: source #映射後的ES字段名稱 type: keyword #ES字段類型
es_mappings 定義索引的mappings(映射關系),不定義es_mappings則根據字段的值自動建立mappings(映射關系)。根據es_mappings 生成的mappings如下:
user_index索引mappings
Lua腳本:
local ops = require("esOps") --加載elasticsearch操作子產品local row = ops.rawRow() --目前資料庫的一行資料,table類型,key為列名稱local action = ops.rawAction() --目前資料庫事件,包括:insert、update、deletelocal id = row["ID"] --擷取ID列的值local userName = row["USER_NAME"] --擷取USER_NAME列的值local password = row["PASSWORD"] --擷取USER_NAME列的值local createTime = row["CREATE_TIME"] --擷取CREATE_TIME列的值local remark = row["REMARK"] --擷取REMARK列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 資料來源if action == "insert" then -- 隻監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的資料主鍵;參數3為要插入的資料,tablele類型或者json字元串end
同步到Elasticsearch的資料如下:
示例二
t_user表,同執行個體一
引入Lua腳本:
schema: eseap #資料庫名稱 table: t_user #表名稱 lua_file_path: lua/t_user_es2.lua #lua腳本檔案
未明确定義index名稱、mappings,es會根據值自動建立一個名為t_user的index。
使用如下腳本:
local ops = require("esOps") --加載elasticsearch操作子產品local row = ops.rawRow() --目前資料庫的一行資料,table類型,key為列名稱local action = ops.rawAction() --目前資料庫事件,包括:insert、update、deletelocal id = row["ID"] --擷取ID列的值local userName = row["USER_NAME"] --擷取USER_NAME列的值local password = row["PASSWORD"] --擷取USER_NAME列的值local createTime = row["CREATE_TIME"] --擷取CREATE_TIME列的值local result = {} -- 定義一個table,作為結果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 資料來源if action == "insert" then -- 隻監聽新增事件 ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的資料主鍵;參數3為要插入的資料,tablele類型或者json字元串end
同步到Elasticsearch的資料如下:
esOps子產品提供的方法如下:
- INSERT: 插入操作,如:ops.INSERT(index,id,result)。參數index為索引名稱,字元串類型;參數index為要插入資料的主鍵;參數result為要插入的資料,可以為table類型或者json字元串
- UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。參數index為索引名稱,字元串類型;參數index為要修改資料的主鍵;參數result為要修改的資料,可以為table類型或者json字元串
- DELETE: 删除操作,如:ops.DELETE(index,id)。參數index為索引名稱,字元串類型;參數id為要删除的資料主鍵,類型不限;
文章來源:https://www.jianshu.com/p/5a9b6c4f318c