增量订阅/消费设计
首先生成一个数据库连接字段,如下所示;
具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:
其中对应的代码,如下所示:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
- batchsize是可以去进行自己定义的
- a. batch id 唯一标识
- 代码获取如下所示:
-
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 //获取message对象信息 long batchId = message.getId(); //去获得唯一获得ID int size = message.getEntries().size(); if (batchId == -1 || size == 0) { //当message的ID不是正常状态,进行间隔使用
- b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
- 因为batchId是唯一的,因
- void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
- canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
- 流式api设计的好处:
- get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
- get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:
- 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
- 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
- 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
- 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
数据格式
canal采用protobuff:这也是cannal的数据传输格式如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | |
数据接受的详细代码如下所示:
根据message对象去获取已整个Entry对象,
数据处理阶段如下所示:
private void parseEntry( List<CanalEntry.Entry> entrys) {
List<DataSyncLog> syncList = Lists.newArrayList();
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;//将enry的字段中的数据库操作变化记录下来
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
String tableName = entry.getHeader().getTableName();//获取传输数据中的数据库表
String eventType = rowChage.getEventType().name();
//以上过程都是对数据进行取数据的过程。
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
DataSyncLog log = new DataSyncLog();
//建立数据表的数据封装的过程
log.setDataSource(SyncDataSource.MYSQL.getValue());//确定这里的数据源是来自于mysql还是mogodb
log.setPlatformId(platformId); //平台的id,
log.setId(UUIDUtils.getUuidCode()); //数据被打到平台为了
log.setCreatedOn(LocalDateTime.now());//新增时间,记录当前插入进MQ的时间作为取值,绑定到
log.setModifiedOn(log.getCreatedOn()); //记录时间和更改时间进行对等,对该批数据进行打标签
log.setTableName(tableName); //数据变化所操作的表
if (CanalEntry.EventType.INSERT.equals(rowChage.getEventType())) {
//对上面蓝色的部分进行一个数据的校验
log.setOperateType(SyncOptType.INSERT.getValue());
parseColumn(rowData.getAfterColumnsList(), log);
} else if (CanalEntry.EventType.UPDATE.equals(rowChage.getEventType())) {
//对于对上面的蓝色数据部分,进行一个数据的校验
log.setOperateType(SyncOptType.UPDATE.getValue());
parseColumn(rowData.getAfterColumnsList(), log);
} else if (CanalEntry.EventType.DELETE.equals(rowChage.getEventType())) {
//对于上面的蓝色变动数据的部分进行一个数据的基本校验
log.setOperateType(SyncOptType.DELETE.getValue());
parseColumn(rowData.getBeforeColumnsList(), log);
} else {
continue;
}
syncList.add(log);
}
}
if (CollectionUtils.isNotEmpty(syncList)) {
dataSyncService.insertList(syncList);
}
// TODO 放入消息队列并调API
DataSyncDTO dto = new DataSyncDTO();
dto.setDataSource(SyncDataSource.MYSQL.getValue()); //给DTO中打上一个标志,如是mysql还是mogodb
dto.setDataSyncLogList(syncList);
try {
System.out.println("即将进入了MQ前缀");
activeMqManage.sendMessage("queue.DataSync", dto);
} catch (JMSException e) {
e.printStackTrace();
}
}
//对于column中的数据进行一个数据封装操作:
private void parseColumn( List<CanalEntry.Column> columns, DataSyncLog log) {
List<DataColumnDTO> resultList = Lists.newArrayList();
String id = null;
for (CanalEntry.Column column : columns) {
DataColumnDTO model = new DataColumnDTO(); //数据同步传输表字段,进行数据的跟踪
获取clomun中name对应的value值
model.setFiledName(column.getName());
model.setFiledType(column.getMysqlType());
model.setValue(column.getValue());//column中具体的文本
// 获取主键字段和值
if (column.getIsKey()) { //是否为主键
id = column.getValue();//将value中具体的id设置为id
model.setPK(true); //这个是事件的主键Id,当存在的时候进行标志,不存在就算了
}
resultList.add(model);
}
log.setPkId(id); 将事件中获取的id作为log的主键ID
log.setDataJson(JSONObject.toJSONString(resultList));//将所有的结果封装成Json并给log
}
具体操作的显示代码如下所示:
package com.wutos.sync.component;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Lists;
import com.wutos.sync.domain.dto.DataColumnDTO;
import com.wutos.sync.domain.dto.DataSyncDTO;
import com.wutos.sync.domain.entity.DataSyncLog;
import com.wutos.sync.enums.SyncDataSource;
import com.wutos.sync.enums.SyncOptType;
import com.wutos.sync.mq.ActiveMqManage;
import com.wutos.sync.service.IDataSyncService;
import com.wutos.sync.utils.UUIDUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.time.LocalDateTime;
import java.util.List;
public class CanalTask implements Runnable {
Logger logger = LoggerFactory.getLogger(CanalTask.class);
private CanalConnector connector;
private boolean isRunable = true;
private String platformId;
private IDataSyncService dataSyncService;
private ActiveMqManage activeMqManage;
/**
* canal监听查询间隔:毫秒
*/
private long sleepMillis;
public CanalTask(CanalConnector connector, long sleepMillis, String platformId) {
this.connector = connector;
this.sleepMillis = sleepMillis;
this.platformId = platformId;
this.dataSyncService = (IDataSyncService)SpringUtil.getBean("dataSyncService");
this.activeMqManage = (ActiveMqManage)SpringUtil.getBean("dataSyncMQ");
}
@Override
public void run() {
logger.info("task is run!");
connectServer();
}
public void stop() {
this.isRunable = false;
logger.info("task is stop!");
}
private void connectServer() {
logger.info("--------------------------------------------------------------------------------canal的server数据处理阶段");
int batchSize = 100;
try {
connector.connect();
// 本连接器匹配表名???
connector.subscribe(".*\\..*");
connector.rollback();
while (isRunable) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
parseEntry(message.getEntries());
} catch (Exception ex) {
logger.error("canal connectServer", ex);
}
}
connector.ack(batchId); // 提交确认,
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception ex) {
logger.error("canal connectServer", ex);
} finally {
connector.disconnect();
logger.warn("connector.disconnect()!");
}
}
private void parseEntry( List<CanalEntry.Entry> entrys) {
List<DataSyncLog> syncList = Lists.newArrayList();
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
String tableName = entry.getHeader().getTableName();
String eventType = rowChage.getEventType().name();
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
DataSyncLog log = new DataSyncLog();
log.setDataSource(SyncDataSource.MYSQL.getValue());//确定这里的数据源是来自于mysql还是mogodb
log.setPlatformId(platformId); //平台的id,
log.setId(UUIDUtils.getUuidCode()); //数据被打到平台为了
log.setCreatedOn(LocalDateTime.now());//新增时间,记录当前插入进MQ的时间作为取值,绑定到
log.setModifiedOn(log.getCreatedOn()); //记录时间和更改时间进行对等,对该批数据进行打标签
log.setTableName(tableName); //数据变化所操作的表
if (CanalEntry.EventType.INSERT.equals(rowChage.getEventType())) {
log.setOperateType(SyncOptType.INSERT.getValue());
parseColumn(rowData.getAfterColumnsList(), log);
} else if (CanalEntry.EventType.UPDATE.equals(rowChage.getEventType())) {
log.setOperateType(SyncOptType.UPDATE.getValue());
parseColumn(rowData.getAfterColumnsList(), log);
} else if (CanalEntry.EventType.DELETE.equals(rowChage.getEventType())) {
log.setOperateType(SyncOptType.DELETE.getValue());
parseColumn(rowData.getBeforeColumnsList(), log);
} else {
continue;
}
syncList.add(log);
}
}
if (CollectionUtils.isNotEmpty(syncList)) {
dataSyncService.insertList(syncList);
}
// TODO 放入消息队列并调API
DataSyncDTO dto = new DataSyncDTO();
dto.setDataSource(SyncDataSource.MYSQL.getValue()); //给DTO中打上一个标志,如是mysql还是mogodb
dto.setDataSyncLogList(syncList);
try {
System.out.println("即将进入了MQ前缀");
activeMqManage.sendMessage("queue.DataSync", dto);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 将canal的对象转成自己的数据传输对象,进行json转换
* @param columns
* @param log
*/
private void parseColumn( List<CanalEntry.Column> columns, DataSyncLog log) {
List<DataColumnDTO> resultList = Lists.newArrayList();
String id = null;
for (CanalEntry.Column column : columns) {
DataColumnDTO model = new DataColumnDTO(); //数据同步传输表字段,进行数据的跟踪
model.setFiledName(column.getName());
model.setFiledType(column.getMysqlType());
model.setValue(column.getValue());
// 获取主键字段和值
if (column.getIsKey()) {
id = column.getValue();
model.setPK(true); //这个是事件的主键Id,当存在的时候进行标志,不存在就算了
}
resultList.add(model);
}
log.setPkId(id);
log.setDataJson(JSONObject.toJSONString(resultList));
}
private void printColumn( List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " colType=" + column.getMysqlType()
+ " update=" + column.getUpdated()
+ " isKey=" + column.getIsKey());
}
}
}
/** 打印结果举例
================> binlog[mysql-bin.000010:2407] , name[wesafe_west,abproles] , eventType : UPDATE
-------> before
Id : 10 colType=int(11) update=false
ConcurrencyStamp : 1ae6614c-e82c-4746-a9a3-57cbc60e80bd colType=longtext update=false
CreationTime : 2017-11-01 04:00:21.000000 colType=datetime(6) update=false
CreatorUserId : 2 colType=bigint(20) update=false
DeleterUserId : 2 colType=bigint(20) update=false
DeletionTime : 2018-02-23 17:54:36.063460 colType=datetime(6) update=false
DisplayName : 管理员 colType=varchar(64) update=false
IsDefault : 0 colType=bit(1) update=false
IsDeleted : 0 colType=bit(1) update=false
IsStatic : 0 colType=bit(1) update=false
LastModificationTime : colType=datetime(6) update=false
LastModifierUserId : colType=bigint(20) update=false
Name : 68f545a7addc42d29a16d60afa90d11a colType=varchar(32) update=false
NormalizedName : 68F545A7ADDC42D29A16D60AFA90D11A colType=varchar(32) update=false
TenantId : 1 colType=int(11) update=false
-------> after
Id : 10 colType=int(11) update=false
ConcurrencyStamp : 1ae6614c-e82c-4746-a9a3-57cbc60e80bd colType=longtext update=false
CreationTime : 2017-11-01 04:00:21.000000 colType=datetime(6) update=false
CreatorUserId : 2 colType=bigint(20) update=false
DeleterUserId : 2 colType=bigint(20) update=false
DeletionTime : 2018-02-23 17:54:36.063460 colType=datetime(6) update=false
DisplayName : 管理员1 colType=varchar(64) update=true
IsDefault : 0 colType=bit(1) update=false
IsDeleted : 0 colType=bit(1) update=false
IsStatic : 0 colType=bit(1) update=false
LastModificationTime : colType=datetime(6) update=false
LastModifierUserId : colType=bigint(20) update=false
Name : 68f545a7addc42d29a16d60afa90d11a colType=varchar(32) update=false
NormalizedName : 68F545A7ADDC42D29A16D60AFA90D11A colType=varchar(32) update=false
TenantId : 1 colType=int(11) update=false
================> binlog[mysql-bin.000008:2838] , name[wesafe_west,user_profile] , eventType : INSERT
-------> before
-------> after
uid : 211 update=true
TenantId : 1 update=true
================> binlog[mysql-bin.000008:3122] , name[wesafe_west,user_profile] , eventType : DELETE
-------> before
uid : 211 update=false
TenantId : 1 update=false
-------> after
*/