天天看點

kafka源碼系列之mysql資料增量同步到kafka

一,架構介紹

生産中由于曆史原因web後端,mysql叢集,kafka叢集(或者其它消息隊列)會存在一下三種結構。

1,資料先入mysql叢集,再入kafka

資料入mysql叢集是不可更改的,如何再高效的将資料寫入kafka呢?

A),在表中存在自增ID的字段,然後根據ID,定期掃描表,然後将資料入kafka。

B),有時間字段的,可以按照時間字段定期掃描入kafka叢集。

C),直接解析binlog日志,然後解析後的資料寫入kafka。

kafka源碼系列之mysql資料增量同步到kafka

2,web後端同時将資料寫入kafka和mysql叢集

kafka源碼系列之mysql資料增量同步到kafka

3,web後端将資料先入kafka,再入mysql叢集

這個方式,有很多優點,比如可以用kafka解耦,然後将資料按照離線存儲和計算,實時計算兩個子產品建構很好的大資料架構。抗高峰,便于擴充等等。

kafka源碼系列之mysql資料增量同步到kafka

二,實作步驟

1,mysql安裝準備

安裝mysql估計看這篇文章的人都沒什麼問題,是以本文不具體講解了。

A),假如你單機測試請配置好server_id

B),開啟binlog,隻需配置log-bin

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

server_id=1

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0

log-bin=/var/lib/mysql/mysql-binlog

[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

建立測試庫和表

create database school character set utf8 collate utf8_general_ci;

create table student(

name varchar(20) not null comment '姓名',

sid int(10) not null primary key comment '學員',

majora varchar(50) not null default '' comment '專業',

tel varchar(11) not null unique key comment '手機号',

birthday date not null comment '出生日期'

);

2,binlog日志解析

兩種方式:

一是掃面binlog檔案(有需要的話請聯系浪尖)

二是通過複制同步的方式

暫實作了第二種方式,樣例代碼如下:

MysqlBinlogParse mysqlBinlogParse = new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){

  @Override

  public void processDelete(String queryType, String database, String sql) {

    try {

      String jsonString = SqlParse.parseDeleteSql(sql);

      JSONObject jsonObject = JSONObject.fromObject(jsonString);

      jsonObject.accumulate("database", database);

      jsonObject.accumulate("queryType", queryType);

      System.out.println(sql);

      System.out.println(" ");

      System.out.println(jsonObject.toString());

    } catch (Exception e) {

      // TODO Auto-generated catch block

      e.printStackTrace();

    }

  }

  public void processInsert(String queryType, String database, String sql) {

      String jsonString = SqlParse.parseInsertSql(sql);

  public void processUpdate(String queryType, String database, String sql) {

    String jsonString;

      jsonString = SqlParse.parseUpdateSql(sql);

};

mysqlBinlogParse.setServerId(3);

mysqlBinlogParse.start();

3,sql文法解析

從原始的mysql 的binlog event中,我們能解析到的資訊,主要的也就是mysql的database,query類型(INSERT,DELETE,UPDATE),具體執行的sql。我這裡封裝了三個重要的方法。隻暴露了這三個接口,那麼我們要明白的事情是,我們入kafka,然後流式處理的時候希望的到的是跟插入mysql後一樣格式的資料。這個時候我們就要自己做sql的解析,将query的sql解析成字段形式的資料,供流式處理。解析的格式如下:

A),INSERT

kafka源碼系列之mysql資料增量同步到kafka

B),DELETE

kafka源碼系列之mysql資料增量同步到kafka

C),UPDATE

kafka源碼系列之mysql資料增量同步到kafka

最終浪尖是将解析後的資料封裝成了json,然後我們自己寫kafka producer将消息發送到kafka,後端就可以處理了。

三,總結

最後,浪尖還是建議web後端資料最好先入消息隊列,如kafka,然後分離線和實時将資料進行解耦分流,用于實時處理和離線處理。

消息隊列的訂閱者可以根據需要随時擴充,可以很好的擴充資料的使用者。

消息隊列的橫向擴充,增加吞吐量,做起來還是很簡單的。這個用傳統資料庫,分庫分表還是很麻煩的。

繼續閱讀