前言
資料複制主要指通過互聯的網絡在多台機器上儲存相同資料的副本,通過資料複制方案,人們通常希望達到以下目的:1)使資料在地理位置上更接近使用者,進而降低通路延遲;2)當部分元件出現故障時,系統依舊可以繼續工作,提高可用性;3)擴充至多台機器以同時提供資料通路服務,進而提升讀吞吐量。
如果複制的資料一成不變,那麼資料複制就非常容易,隻需要将資料複制到每個節點,一次性即可搞定,面對持續更改的資料如何正确而有效的完成資料複制是一個不小的挑戰。
使用DataX進行Tablestore資料複制
表格存儲(Tablestore)是阿裡雲自研的NoSQL多模型資料庫,提供海量結構化資料存儲以及快速的查詢和分析服務,表格存儲的分布式存儲和強大的索引引擎能夠提供PB級存儲、千萬TPS以及毫秒級延遲的服務能力。DataX是阿裡巴巴集團内被廣泛使用的離線資料同步工具,DataX本身作為資料同步架構,将不同資料源的同步抽象為從源頭資料源讀取資料的Reader插件,以及向目标端寫入資料的Writer插件。
通過使用DataX可以完成Tablestore表的資料複制,如下圖所示,
otsreader插件實作了從Tablestore讀取資料,并可以通過使用者指定抽取資料範圍可友善的實作資料增量抽取的需求,
otsstreamreader插件實作了Tablestore的增量資料導出,而
otswriter插件則實作了向Tablestore中寫入資料。通過在DataX中配置Tablestore相關的Reader和Writer插件,即可以完成Tablestore的表資料複制。
使用通道服務進行Tablestore資料複制
通道服務(Tunnel Service)是基于表格存儲資料接口之上的全增量一體化服務。通道服務為您提供了增量、全量、增量加全量三種類型的分布式資料實時消費通道。通過為資料表建立資料通道,可以簡單地實作對表中曆史存量和新增資料的消費處理。
借助于全增量一體的通道服務,我們可以輕松建構高效、彈性的資料複制解決方案。本文将逐漸介紹如何結合通道服務進行Tablestore的資料複制,完整代碼開源在github上的
tablestore-examples中。本次的實戰将基于通道服務的Java SDK來完成,推薦先閱讀下通道服務的
相關文檔,包括
快速開始等。
1. 配置抽取
配置抽取其實對應的是資料同步所具備的功能,在本次實戰中,我們将完成指定時間點之前的表資料同步,指定的時間點可以是現在或者未來的某個時刻。具體的配置如下所示,ots-reader中記錄的是源表的相關配置,ots-writer中記錄的是目的表的相關配置。
{
"ots-reader": {
"endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-high",
"tableName": "testSrcTable",
"accessId": "",
"accessKey": "",
"tunnelName": "testTunnel",
"endTime": "2019-06-19 17:00:00"
},
"ots-writer": {
"endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
"instanceName": "zhuoran-search",
"tableName": "testDstTable",
"accessId": "",
"accessKey": "",
"batchWriteCount": 100
}
}
ots-reader中各參數的說明如下:
- endpoint: Tablestore服務的Endpoint位址,例如 https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com 。在進行資料複制前,請檢查下連通性(可以使用curl指令)。
- instanceName: Tablestore的執行個體名。
- tableName: Tablestore的表名。
- accessId: 通路Tablestore的雲賬号accessId。
- accessKey: 通路Tablestore的雲賬号accessKey。
- tunnelName: Tablestore的通道名,配置
- endTime: 資料同步的截止時間點,對應到Java裡SimpleFormat的格式為:
。yyyy-MM-dd HH:mm:ss
ots-writer中各參數的說明如下(略去相同的參數):
- batchWriteCount: Tablestore單次批量寫入的條數,最大值為200。
注:未來會開放更多的功能配置,比如指定時間範圍的資料複制等。
2. 編寫主邏輯
資料複制的主邏輯主要分為以下4步,在第一次運作時,會完整的進行所有步驟,而在程式重新開機或者斷點續傳場景時,隻需要進行第3步和第4步。
-
建立複制目的表
通過使用DesribeTable接口,我們可以擷取到源表的Schema,借此可以建立出目的表,值得注意的是需要把目的表的有效版本偏差設成一個足夠大的值(預設為86400秒),因為服務端在處理寫請求時會對屬性列的版本号進行檢查,寫入的版本号需要在一個範圍内才能寫入成功,對于源表中的曆史存量資料而言,時間戳往往是比較小的,會被服務端過濾掉,最終導緻同步資料的丢失。
sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
DescribeTableResponse describeTableResponse = sourceClient.describeTable(
new DescribeTableRequest(config.getReadConf().getTableName()));
describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
describeTableResponse.getTableOptions(),
new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
destClient.createTable(createTableRequest);
System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
-
在源表上建立通道
使用通道服務的CreateTunnel接口可以建立通道,此處我們建立全量加增量類型(TunnelType.BaseAndStream)類型的通道。
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
tunnelId = tunnelInfo.getTunnelId();
System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
config.getReadConf().getTunnelName(), tunnelId));
} else {
CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
new CreateTunnelRequest(config.getReadConf().getTableName(),
config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
-
啟動定時任務來監測備份進度
備份進度的監測可以通過DesribeTunnel接口來完成,DescribeTunnel接口可以擷取到最新消費到的時間點,通過和配置裡的備份結束時間對比,我們可以擷取到目前同步的進度。在到達結束時間後,即可退出備份程式。
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "background-checker-" + counter.getAndIncrement());
}
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
));
// 已同步完成
if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
System.out.println("Table copy finished, program exit!");
// 退出備份程式
shutdown();
}
}
}, 0, 2, TimeUnit.SECONDS);
-
啟動資料複制
啟動通道服務的自動化消費架構,開始自動化的資料同步,其中OtsReaderProcessor中完成的是源表資料的解析和目的表的寫入,處理邏輯将會在後文中介紹。
if (tunnelId != null) {
sourceWorkerConfig = new TunnelWorkerConfig(
new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
sourceWorkerConfig.setHeartbeatIntervalInSec(15);
sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
sourceWorker.connectAndWorking();
}
3. 資料同步邏輯(OtsReaderProcessor)
使用通道服務,我們需要編寫資料的Process邏輯和Shutdown邏輯,資料同步中的核心在于解析資料并将其寫入到目的表中,處理資料的完整代碼如下所示,主要邏輯還是比較清晰的,首先會檢查資料的時間戳是否在合理的時間範圍内,然後将StreamRecord轉化為BatchWrite裡對應的行,最後将資料串行寫入到目的表中。
public void process(ProcessRecordsInput input) {
System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
int count = 0;
for (StreamRecord record : input.getRecords()) {
if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
System.out.println(String.format("skip record timestamp %d larger than endTime %d",
record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
continue;
}
count++;
switch (record.getRecordType()) {
case PUT:
RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
putChange.addColumns(getColumns(record));
batchWriteRowRequest.addRowChange(putChange);
break;
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
case PUT:
updateChange.put(column.getColumn());
break;
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
batchWriteRowRequest.addRowChange(updateChange);
break;
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
record.getPrimaryKey());
batchWriteRowRequest.addRowChange(deleteChange);
break;
default:
break;
}
if (count == writeConf.getBatchWriteCount()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
batchWriteRowRequest = new BatchWriteRowRequest();
count = 0;
}
}
// 寫最後一次的資料。
if (!batchWriteRowRequest.isEmpty()) {
System.out.println("BatchWriteRow: " + count);
writeClient.batchWriteRow(batchWriteRowRequest);
}
}
4. 技術注解
-
如何保障備份性能?
備份過程分為全量(存量)和增量階段,對于全量階段,通道服務會自動将全表的資料在邏輯上劃分成接近指定大小的若幹分片,全量階段的資料同步的整體并行度和分片數相關,能夠有效的保障吞吐量。而對于增量階段,為了保障資料的有序性,單分區内的資料我們需要串行處理資料,增量階段的性能和分區數成正比關系(
增量同步性能白皮書 ),如果需要提速(增加分區)可以聯系表格存儲技術支援。 -
如何做到資料同步的水準擴充?
運作多個TunnelWorker(用戶端)對同一個Tunnel進行消費時(TunnelId相同), 在TunnelWorker執行Heartbeat時,通道服務端會自動的對Channel(分區)資源進行重配置設定,讓活躍的Channel盡可能的均攤到每一個TunnelWorker上,達到資源負載均衡的目的。同時,在水準擴充性方面,使用者可以很容易的通過增加TunnelWorker的數量來完成,TunnelWorker可以在同一個機器或者不同機器上。更多的原理可以參見
資料消費架構原理介紹 。 -
如何做到資料的最終一緻性?
資料的一緻性建立在通道服務的保序協定基礎上,通過全量和增量資料同步的幂等性可以保障備份資料的最終一緻。
-
如何完成斷點續傳功能?
通道服務的用戶端會定期将已同步(消費)完成的資料的時間位點定期發送到服務端進行持久化,在發生Failover或者重新開機程式後,下一次的資料消費會從記錄的checkpoint開始資料處理,不會造成資料的丢失。
未來展望
在本次的實戰中,我們結合通道服務完成一個簡潔而有效的資料複制方案,實作了指定時間點的表資料複制。借助于本次的實戰樣例代碼,使用者僅需要配置源表和目的表的相關參數,即可以高效的完成的表資料的複制和資料的遷移。
在未來的演進中,通道服務還将支援建立指定時間段的通道,這樣可以更加靈活的制定資料備份的計劃,也可以完成持續備份和按時間點恢複等更加豐富的功能。
參考文獻
- Desiging Data-Intensive Applications.
寫在最後
如果您對表格存儲感興趣,歡迎加入【表格存儲公開交流群】交流探讨,群号:11789671。