天天看点

1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:

增量订阅/消费设计

首先生成一个数据库连接字段,如下所示;

1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:

具体的协议格式,可参见:CanalProtocol.proto

get/ack/rollback协议介绍:

其中对应的代码,如下所示:

1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:
  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
  • batchsize是可以去进行自己定义的
  • 1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:
  • a. batch id 唯一标识
  • 代码获取如下所示:
  • Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
        //获取message对象信息
                    long batchId = message.getId();
       //去获得唯一获得ID
                    int size = message.getEntries().size();
    
                    if (batchId == -1 || size == 0) {
    //当message的ID不是正常状态,进行间隔使用
               
  • b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
  • 因为batchId是唯一的,因
  • 1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
  • canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
  • 流式api设计的好处:
  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

流式api设计:

1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:
  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

数据格式

canal采用protobuff:这也是cannal的数据传输格式如下所示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

Entry

Header

logfileName [binlog文件名]

logfileOffset [binlog position]

executeTime [发生的变更]

schemaName

tableName

eventType [insert/update/delete类型]

entryType   [事务头BEGIN/事务尾END/数据ROWDATA]

storeValue  [

byte

数据,可展开,对应的类型为RowChange]   

RowChange

isDdl       [是否是ddl变更操作,比如create table/drop table]

sql     [具体的ddl sql]

rowDatas    [

具体insert/update/delete的变更数据

,可为多条,

1

个binlog event事件可对应多条变更,比如批处理]

beforeColumns [Column类型的数组]

afterColumns [Column类型的数组]     

Column

index      

sqlType     [jdbc type]

name        [column name]

isKey       [是否为主键]

updated     [是否发生过变更]

isNull      [值是否为

null

]

value       [具体的内容,注意为文本]

数据接受的详细代码如下所示:

根据message对象去获取已整个Entry对象,

1.记录canal数据库中间件在项目中的实际应用以及过程,附上代码:

数据处理阶段如下所示:

private void parseEntry( List<CanalEntry.Entry> entrys) {
            
        List<DataSyncLog> syncList = Lists.newArrayList();
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage;//将enry的字段中的数据库操作变化记录下来

                   
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
                String tableName = entry.getHeader().getTableName();//获取传输数据中的数据库表
                String eventType = rowChage.getEventType().name();
                   //以上过程都是对数据进行取数据的过程。
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    DataSyncLog log = new DataSyncLog();
                    //建立数据表的数据封装的过程
                    log.setDataSource(SyncDataSource.MYSQL.getValue());//确定这里的数据源是来自于mysql还是mogodb
                    log.setPlatformId(platformId);   //平台的id,
                    log.setId(UUIDUtils.getUuidCode()); //数据被打到平台为了
                    log.setCreatedOn(LocalDateTime.now());//新增时间,记录当前插入进MQ的时间作为取值,绑定到
                    log.setModifiedOn(log.getCreatedOn());  //记录时间和更改时间进行对等,对该批数据进行打标签
                    log.setTableName(tableName);           //数据变化所操作的表
                    if (CanalEntry.EventType.INSERT.equals(rowChage.getEventType())) {
                           //对上面蓝色的部分进行一个数据的校验
                        log.setOperateType(SyncOptType.INSERT.getValue());
                        parseColumn(rowData.getAfterColumnsList(), log);
                    } else if (CanalEntry.EventType.UPDATE.equals(rowChage.getEventType())) {
                         //对于对上面的蓝色数据部分,进行一个数据的校验
                        log.setOperateType(SyncOptType.UPDATE.getValue());
                        parseColumn(rowData.getAfterColumnsList(), log);
                    } else if (CanalEntry.EventType.DELETE.equals(rowChage.getEventType())) {
                         //对于上面的蓝色变动数据的部分进行一个数据的基本校验
                        log.setOperateType(SyncOptType.DELETE.getValue());
                        parseColumn(rowData.getBeforeColumnsList(), log);
                    } else {
                        continue;
                    }
                    syncList.add(log);
                }
            }
        if (CollectionUtils.isNotEmpty(syncList)) {
            dataSyncService.insertList(syncList);
        }
        // TODO 放入消息队列并调API
        DataSyncDTO dto = new DataSyncDTO();
        dto.setDataSource(SyncDataSource.MYSQL.getValue()); //给DTO中打上一个标志,如是mysql还是mogodb
        dto.setDataSyncLogList(syncList);
        try {
            System.out.println("即将进入了MQ前缀");
            activeMqManage.sendMessage("queue.DataSync", dto);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

//对于column中的数据进行一个数据封装操作:
private void parseColumn( List<CanalEntry.Column> columns, DataSyncLog log) {
        List<DataColumnDTO> resultList = Lists.newArrayList();
        String id = null;
        for (CanalEntry.Column column : columns) {
          
            DataColumnDTO model = new DataColumnDTO(); //数据同步传输表字段,进行数据的跟踪
            获取clomun中name对应的value值
            model.setFiledName(column.getName());
            model.setFiledType(column.getMysqlType());
            model.setValue(column.getValue());//column中具体的文本
            // 获取主键字段和值
            if (column.getIsKey()) { //是否为主键
                id = column.getValue();//将value中具体的id设置为id
                model.setPK(true);  //这个是事件的主键Id,当存在的时候进行标志,不存在就算了
            }
            resultList.add(model);
        }

        log.setPkId(id); 将事件中获取的id作为log的主键ID
        log.setDataJson(JSONObject.toJSONString(resultList));//将所有的结果封装成Json并给log
    }

           

具体操作的显示代码如下所示:

package com.wutos.sync.component;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Lists;
import com.wutos.sync.domain.dto.DataColumnDTO;
import com.wutos.sync.domain.dto.DataSyncDTO;
import com.wutos.sync.domain.entity.DataSyncLog;
import com.wutos.sync.enums.SyncDataSource;
import com.wutos.sync.enums.SyncOptType;
import com.wutos.sync.mq.ActiveMqManage;
import com.wutos.sync.service.IDataSyncService;
import com.wutos.sync.utils.UUIDUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import java.time.LocalDateTime;
import java.util.List;

public class CanalTask implements Runnable {

    Logger logger = LoggerFactory.getLogger(CanalTask.class);

    private CanalConnector connector;

    private boolean isRunable = true;

    private String platformId;

    private IDataSyncService dataSyncService;

    private ActiveMqManage activeMqManage;

    /**
     * canal监听查询间隔:毫秒
     */
    private long sleepMillis;

    public CanalTask(CanalConnector connector, long sleepMillis, String platformId) {
        this.connector = connector;
        this.sleepMillis = sleepMillis;
        this.platformId = platformId;
        this.dataSyncService = (IDataSyncService)SpringUtil.getBean("dataSyncService");
        this.activeMqManage = (ActiveMqManage)SpringUtil.getBean("dataSyncMQ");
    }

    @Override
    public void run() {
        logger.info("task is run!");
        connectServer();
    }

    public void stop() {
        this.isRunable = false;
        logger.info("task is stop!");
    }

    private void connectServer() {
        logger.info("--------------------------------------------------------------------------------canal的server数据处理阶段");
        int batchSize = 100;
        try {
            connector.connect();
            // 本连接器匹配表名???
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (isRunable) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(sleepMillis);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        parseEntry(message.getEntries());
                    } catch (Exception ex) {
                        logger.error("canal connectServer", ex);
                    }
                }

                connector.ack(batchId); // 提交确认,
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

        } catch (Exception ex) {
            logger.error("canal connectServer", ex);
        } finally {
            connector.disconnect();
            logger.warn("connector.disconnect()!");
        }
    }

    private void parseEntry( List<CanalEntry.Entry> entrys) {
        List<DataSyncLog> syncList = Lists.newArrayList();
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                CanalEntry.RowChange rowChage;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
                String tableName = entry.getHeader().getTableName();
                String eventType = rowChage.getEventType().name();
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    DataSyncLog log = new DataSyncLog();
                    log.setDataSource(SyncDataSource.MYSQL.getValue());//确定这里的数据源是来自于mysql还是mogodb
                    log.setPlatformId(platformId);   //平台的id,
                    log.setId(UUIDUtils.getUuidCode()); //数据被打到平台为了
                    log.setCreatedOn(LocalDateTime.now());//新增时间,记录当前插入进MQ的时间作为取值,绑定到
                    log.setModifiedOn(log.getCreatedOn());  //记录时间和更改时间进行对等,对该批数据进行打标签
                    log.setTableName(tableName);           //数据变化所操作的表
                    if (CanalEntry.EventType.INSERT.equals(rowChage.getEventType())) {
                        log.setOperateType(SyncOptType.INSERT.getValue());
                        parseColumn(rowData.getAfterColumnsList(), log);
                    } else if (CanalEntry.EventType.UPDATE.equals(rowChage.getEventType())) {
                        log.setOperateType(SyncOptType.UPDATE.getValue());
                        parseColumn(rowData.getAfterColumnsList(), log);
                    } else if (CanalEntry.EventType.DELETE.equals(rowChage.getEventType())) {
                        log.setOperateType(SyncOptType.DELETE.getValue());
                        parseColumn(rowData.getBeforeColumnsList(), log);
                    } else {
                        continue;
                    }
                    syncList.add(log);
                }
            }
        if (CollectionUtils.isNotEmpty(syncList)) {
            dataSyncService.insertList(syncList);
        }
        // TODO 放入消息队列并调API
        DataSyncDTO dto = new DataSyncDTO();
        dto.setDataSource(SyncDataSource.MYSQL.getValue()); //给DTO中打上一个标志,如是mysql还是mogodb
        dto.setDataSyncLogList(syncList);
        try {
            System.out.println("即将进入了MQ前缀");
            activeMqManage.sendMessage("queue.DataSync", dto);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 将canal的对象转成自己的数据传输对象,进行json转换
     * @param columns
     * @param log
     */
    private void parseColumn( List<CanalEntry.Column> columns, DataSyncLog log) {
        List<DataColumnDTO> resultList = Lists.newArrayList();
        String id = null;
        for (CanalEntry.Column column : columns) {
            DataColumnDTO model = new DataColumnDTO(); //数据同步传输表字段,进行数据的跟踪
            model.setFiledName(column.getName());
            model.setFiledType(column.getMysqlType());
            model.setValue(column.getValue());
            // 获取主键字段和值
            if (column.getIsKey()) {
                id = column.getValue();
                model.setPK(true);  //这个是事件的主键Id,当存在的时候进行标志,不存在就算了
            }
            resultList.add(model);
        }

        log.setPkId(id);
        log.setDataJson(JSONObject.toJSONString(resultList));
    }

    private  void printColumn( List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "   colType=" + column.getMysqlType()
                    + "    update=" + column.getUpdated()
                    + "    isKey=" + column.getIsKey());
        }
    }

}

/** 打印结果举例
 ================> binlog[mysql-bin.000010:2407] , name[wesafe_west,abproles] , eventType : UPDATE
 -------> before
 Id : 10    colType=int(11)    update=false
 ConcurrencyStamp : 1ae6614c-e82c-4746-a9a3-57cbc60e80bd    colType=longtext    update=false
 CreationTime : 2017-11-01 04:00:21.000000    colType=datetime(6)    update=false
 CreatorUserId : 2    colType=bigint(20)    update=false
 DeleterUserId : 2    colType=bigint(20)    update=false
 DeletionTime : 2018-02-23 17:54:36.063460    colType=datetime(6)    update=false
 DisplayName : 管理员    colType=varchar(64)    update=false
 IsDefault : 0    colType=bit(1)    update=false
 IsDeleted : 0    colType=bit(1)    update=false
 IsStatic : 0    colType=bit(1)    update=false
 LastModificationTime :     colType=datetime(6)    update=false
 LastModifierUserId :     colType=bigint(20)    update=false
 Name : 68f545a7addc42d29a16d60afa90d11a    colType=varchar(32)    update=false
 NormalizedName : 68F545A7ADDC42D29A16D60AFA90D11A    colType=varchar(32)    update=false
 TenantId : 1    colType=int(11)    update=false
 -------> after
 Id : 10    colType=int(11)    update=false
 ConcurrencyStamp : 1ae6614c-e82c-4746-a9a3-57cbc60e80bd    colType=longtext    update=false
 CreationTime : 2017-11-01 04:00:21.000000    colType=datetime(6)    update=false
 CreatorUserId : 2    colType=bigint(20)    update=false
 DeleterUserId : 2    colType=bigint(20)    update=false
 DeletionTime : 2018-02-23 17:54:36.063460    colType=datetime(6)    update=false
 DisplayName : 管理员1    colType=varchar(64)    update=true
 IsDefault : 0    colType=bit(1)    update=false
 IsDeleted : 0    colType=bit(1)    update=false
 IsStatic : 0    colType=bit(1)    update=false
 LastModificationTime :     colType=datetime(6)    update=false
 LastModifierUserId :     colType=bigint(20)    update=false
 Name : 68f545a7addc42d29a16d60afa90d11a    colType=varchar(32)    update=false
 NormalizedName : 68F545A7ADDC42D29A16D60AFA90D11A    colType=varchar(32)    update=false
 TenantId : 1    colType=int(11)    update=false

 ================> binlog[mysql-bin.000008:2838] , name[wesafe_west,user_profile] , eventType : INSERT
 -------> before
 -------> after
 uid : 211    update=true
 TenantId : 1    update=true
 ================> binlog[mysql-bin.000008:3122] , name[wesafe_west,user_profile] , eventType : DELETE
 -------> before
 uid : 211    update=false
 TenantId : 1    update=false
 -------> after

 */