天天看點

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

背景

經過上篇文章的測試 Kafka Connect For MySQL 實作增量資料同步,因為研究時間較短,網上資源較少,隻能自己一步一步去探索,在理論層面上是可以實作業務需求,但是實踐過程中遇到一些原因,導緻實際效果沒有達到業務需求。是以将基于業務需求重新進行技術調研。因為筆者目前工作重心在于搭建一個小型的實時數倉。優先級比較高的一個任務就是需要近實時同步業務系統的資料(包括儲存、更新或者軟删除)到一個另一個資料源,持久化之前需要清洗資料并且建構一個相對合理的便于後續業務資料名額統計、資料名額計算等擴充功能的業務模型。基于目前團隊的資源和能力,優先調研了Alibaba開源中間件Canal的使用。

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

這篇文章将測試一下如何快速地搭建一套Canal相關的元件。

關于 Canal

簡介

下面的簡介和下一節的原理均來自于Canal項目的README:

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

canal[kə'næl],譯意為水道/管道/溝渠,主要用途是基于MySQL資料庫增量日志解析,提供增量資料訂閱和消費。早期阿裡巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實作方式主要是基于業務trigger擷取增量變更。從 2010 年開始,業務逐漸嘗試資料庫日志解析擷取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。

基于日志增量訂閱和消費的業務包括:

資料庫鏡像

資料庫實時備份

索引建構和實時維護(拆分異構索引、反向索引等)

業務Cache重新整理

帶業務邏輯的增量資料處理

工作原理

MySQL主備複制原理:

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

MySQL 的 Master 執行個體将資料變更寫入二進制日志(binary log,其中記錄叫做二進制日志事件binary log events,可以通過show binlog events進行檢視)

MySQL 的 Slave 執行個體将master的 binary log events 拷貝到它的中繼日志(relay log)

MySQL 的 Slave 執行個體重放relay log中的事件,将資料變更反映它到自身的資料

Canal 的工作原理如下:

Canal模拟MySQL Slave的互動協定,僞裝自己為MySQL Slave,向MySQL Master發送dump協定

MySQL Master收到dump請求,開始推送binary log給Slave(即Canal)

Canal解析binary log對象(原始為byte流),并且可以通過連接配接器發送到對應的消息隊列等中間件中

部署所需的中間件

搭建一套可以用的元件需要部署MySQL、Zookeeper、Kafka 和 Canal 四個中間件的執行個體,下面簡單分析一下部署過程。選用的伺服器系統是CentOS7。

MySQL 環境搭建

MySQL 安裝

Mysql 開啟 binlog

Zookeeper 分布式環境搭建

Kafka 分布式環境搭建

安裝和使用 Canal

終于到了主角登場,這裡選用 Canal 的v1.1.4穩定釋出版,隻需要下載下傳deployer子產品:

mkdir /data/canal

cd /data/canal

# 這裡注意一點,Github在國内被牆,下載下傳速度極慢,可以先用其他下載下傳工具下載下傳完再上傳到伺服器中

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz

筆者下載下傳這個安裝包下載下傳了很久,故将安裝包放入百度雲盤:(連結:https://pan.baidu.com/s/1ZXZXAEEhoLcnnhSwpUrZmg, 提取碼:52me),以供大家下載下傳

解壓後的目錄如下:

- bin # 運維腳本

- conf # 配置檔案

canal_local.properties # canal本地配置,一般不需要動

canal.properties # canal服務配置

logback.xml # logback日志配置

metrics # 度量統計配置

spring # spring-執行個體配置,主要和binlog位置計算、一些政策配置相關,可以在canal.properties選用其中的任意一個配置檔案

example # 執行個體配置檔案夾,一般認為單個資料庫對應一個獨立的執行個體配置檔案夾

instance.properties # 執行個體配置,一般指單個資料庫的配置

- lib # 服務依賴包

- logs # 日志檔案輸出目錄

在開發和測試環境建議把 logback.xml 的日志級别修改為DEBUG友善定位問題。這裡需要關注 canal.properties 和 instance.properties 兩個配置檔案。 canal.properties 檔案中,需要修改:

去掉 canal.instance.parser.parallelThreadSize = 16 這個配置項的注釋,也就是啟用此配置項,和執行個體解析器的線程數相關,不配置會表現為阻塞或者不進行解析。

canal.serverMode 配置項指定為 kafka ,可選值有 tcp、kafka 和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以選用 rabbitmq),預設是 kafka。

canal.mq.servers 配置需要指定為 Kafka 服務或者叢集 Broker 的位址,這裡配置為 127.0.0.1:9092。

canal.mq.servers在不同的canal.serverMode有不同的意義。

kafka模式下,指Kafka服務或者叢集Broker的位址,也就是bootstrap.servers

rocketmq模式下,指NameServer清單

rabbitmq模式下,指RabbitMQ服務的Host和Port

其他配置項可以參考下面兩個官方Wiki的連結:

instance.properties 一般指一個資料庫執行個體的配置,Canal架構支援一個Canal服務執行個體,處理多個資料庫執行個體的binlog異步解析。instance.properties 需要修改的配置項主要包括:

canal.instance.mysql.slaveId 需要配置一個和Master節點的服務ID完全不同的值,這裡筆者配置為 654321。

配置資料源執行個體,包括位址、使用者、密碼和目标資料庫:

canal.instance.master.address,這裡指定為 127.0.0.1:3306。

canal.instance.dbUsername,這裡指定為canal。

canal.instance.dbPassword,這裡指定為 [email protected]。

新增 canal.instance.defaultDatabaseName,這裡指定為test(需要在MySQL中建立一個 test 資料庫,見前面的流程)。

Kafka 相關配置,這裡暫時使用靜态 topic 和單個 partition:

canal.mq.topic,這裡指定為 test,也就是解析完的 binlog 結構化資料會發送到Kafka 的命名為 test 的topic中。

canal.mq.partition,這裡指定為 0。

配置工作做好之後,可以啟動 Canal 服務:

sh /data/canal/bin/startup.sh

# 檢視服務日志

tail -100f /data/canal/logs/canal/canal

# 檢視執行個體日志 -- 一般情況下,關注執行個體日志即可

tail -100f /data/canal/logs/example/example.log

啟動正常後,見執行個體日志如下:

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

在 test 資料庫建立一個訂單表,并且執行幾個簡單的DML:

USE test;

CREATE TABLE IF NOT EXISTS test.omneo(

pid int(11) NOT NULL AUTO_INCREMENT,

uuid varchar(100) NOT NULL,

firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,

lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,

birthdate varchar(20),

postalcode varchar(20),

city varchar(20),

sexe varchar(20),

status varchar(20),

commenttime timestamp NOT NULL DEFAULT current_timestamp,

PRIMARY KEY (pid)

)ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 插入 4 條測試資料

insert into omneo values(1,"0049683542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H2M2V5","Ballancourt","male","en couple","2020-05-09 11:01:54");

insert into omneo values(2,"8338623542a7-5bdb-d564-3133-276ae3ce","Gauthier","Garbuet","23/05/1965","05274","Cocagne","female","maried","2020-05-09 11:01:54");

insert into omneo values(3,"3374573542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H0H0H0","Ottawa","male","en couple","2020-05-09 11:01:54");

insert into omneo values(4,"5494133542a7-5bdb-d564-3133-276ae3ce","Nicole","Garbuet","01/11/1977","H0H0H0","Maugio","unknown","single","2020-05-09 11:01:54");

# 更新測試資料

update omneo_incrementing_timestamp set firstname = "world" ,commenttime="2020-12-20 15:55:10" where pid in(2,4);

# 删除測試資料

delete from omneo where pid = 1;

mysql binlog isddl_Canal + Kafka 實作 MySQL 的 binlog 近實時同步

具體的資料如下:

# 修改 `root`@`localhost` 密碼操作

{"data":null,"database":"","es":1589265975000,"id":4,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER USER 'root'@'localhost' IDENTIFIED WITH 'caching_sha2_password' AS '$A$005$k>XD\\n6\\\"[hx\u0001Ocm/s\u00164\u007F\u00030iVZ3nTJnQORvohw7T4wWWQnSTz4zvFGfLPO3OxQ1m8'","sqlType":null,"table":"","ts":1589272027697,"type":"QUERY"}

# 建立 `test` 資料庫

{"data":null,"database":"test","es":1589271839000,"id":4,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1589272027697,"type":"QUERY"}

# 建立 `test.omneo` 表

{"data":null,"database":"test","es":1589271993000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE IF NOT EXISTS test.omneo(\n pid int(11) NOT NULL AUTO_INCREMENT,\n uuid varchar(100) NOT NULL,\n firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,\n lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,\n birthdate varchar(20),\n postalcode varchar(20),\n city varchar(20),\n sexe varchar(20),\n status varchar(20),\n commenttime timestamp NOT NULL DEFAULT current_timestamp,\n PRIMARY KEY (pid)\n)ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"omneo","ts":1589272027697,"type":"CREATE"}

# 插入第一條測試資料

{"data":[{"pid":"1","uuid":"0049683542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H2M2V5","city":"Ballancourt","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272135000,"id":5,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272135125,"type":"INSERT"}

# 插入第二條測試資料

{"data":[{"pid":"2","uuid":"8338623542a7-5bdb-d564-3133-276ae3ce","firstname":"Gauthier","lastname":"Garbuet","birthdate":"23/05/1965","postalcode":"05274","city":"Cocagne","sexe":"female","status":"maried","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272136000,"id":6,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272136230,"type":"INSERT"}

# 插入第三條測試資料

{"data":[{"pid":"3","uuid":"3374573542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Ottawa","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272156000,"id":7,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272156356,"type":"INSERT"}

# 插入第四條測試資料

{"data":[{"pid":"4","uuid":"5494133542a7-5bdb-d564-3133-276ae3ce","firstname":"Nicole","lastname":"Garbuet","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Maugio","sexe":"unknown","status":"single","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272156000,"id":8,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272157060,"type":"INSERT"}

# 修改 `pid = 2` and `pid = 4` 的測試資料

{"data":[{"pid":"2","uuid":"8338623542a7-5bdb-d564-3133-276ae3ce","firstname":"world","lastname":"Garbuet","birthdate":"23/05/1965","postalcode":"05274","city":"Cocagne","sexe":"female","status":"maried","commenttime":"2020-12-20 15:55:10"},{"pid":"4","uuid":"5494133542a7-5bdb-d564-3133-276ae3ce","firstname":"world","lastname":"Garbuet","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Maugio","sexe":"unknown","status":"single","commenttime":"2020-12-20 15:55:10"}],"database":"test","es":1589272181000,"id":9,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":[{"firstname":"Gauthier","commenttime":"2020-05-09 11:01:54"},{"firstname":"Nicole","commenttime":"2020-05-09 11:01:54"}],"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272181392,"type":"UPDATE"}

# 删除 `pid = 1` 的測試資料

{"data":[{"pid":"1","uuid":"0049683542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H2M2V5","city":"Ballancourt","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272196000,"id":10,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272196114,"type":"DELETE"}

可見 Kafka 的名為 test 的 topic 已經寫入了對應的結構化 binlog 事件資料,可以編寫消費者監聽 Kafka 對應的 topic 然後對擷取到的資料進行後續處理。

總結

這篇文章大部分篇幅用于介紹其他中間件是怎麼部署的,這個問題側面說明了 Canal 本身部署并不複雜,它的配置檔案屬性項比較多,但是實際上需要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本并不高。後面會分析基于結構化 binlog 事件做 ELT 和持久化相關工作以及Canal的生産環境可用級别HA叢集的搭建。

參考資料: