DataX 是阿裡巴巴集團内被廣泛使用的離線資料同步工具/平台,實作各種異構資料源之間高效的資料同步功能。最近,阿裡雲cassandra團隊為datax提供了cassandra讀寫插件,進一步豐富了datax支援的資料源,可以很友善實作cassandra之間以及cassandra與其他資料源之間的資料同步。本文簡單介紹如何使用datax同步cassandra的資料,針對幾種常見的場景給出配置檔案示例,還提供了提升同步性能的建議和實測的性能資料。
datax快速入門
使用datax同步資料的方法很簡單,一共隻需要三步:
1 部署datax。
2 編寫同步作業配置檔案。
3 運作datax,等待同步作業完成。
datax的部署和運作都很簡單,可以通過datax官方提供的
下載下傳位址下載下傳DataX工具包,下載下傳後解壓至本地某個目錄,進入bin目錄,即可運作同步作業:
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
同步作業的配置格式可以參考
datax文檔。
一個典型的配置檔案如下:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
一個同步作業的配置檔案主要包括兩部分,setting包括任務排程的一些配置,content描述同步任務的内容,裡面包含reader插件的配置和writer插件的配置。例如我們需要從mysql同步資料到cassandra,那麼我們隻需要把reader配置為mysqlreader,writer配置為cassandrawriter,并提供相應的插件配置資訊即可。在
datax項目頁面上面可以看到datax支援的插件清單,點選對應的連結就可以檢視相關插件的文檔了解插件需要的配置内容和格式要求。例如,cassandra插件的文檔可點選如下連結:
讀插件 寫插件以下列舉幾種常見的場景。
場景一 cassandra之間的資料同步
最常見的場景是把資料從一個叢集同步到另一個叢集,例如機房整體遷移、上雲等。這時需要先手動在目标叢集建立好keyspace和表的schema,然後使用datax進行同步。作為例子,下面的配置檔案把資料從cassandra的一個表同步到另一個表:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"id",
"name"
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"id",
"name"
]
}
}
}
]
}
}
場景二 從mysql同步到cassandra
datax支援多種資料源,可以很友善做到cassandra和其他資料源之間的資料同步。下面的配置把資料從mysql同步到cassandra:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"id",
"name"
]
}
}
}
]
}
}
場景三 隻同步cassandra中的一部分資料
我們在讀插件的配置中提供了where關鍵字,可以用來隻同步一部分資料。例如對于時序資料等場景定期同步的情況,就可以通過增加where的條件來實作隻同步增量資料。where條件的格式和cql相同,例如
"where":"textcol='a'"
的作用類似于使用
select * from table_name where textcol = 'a'
進行查詢。另外還有allowFiltering關鍵字配合where使用,作用和cql中的ALLOW FILTERING關鍵字也是相同的。下面給出一個配置的例子:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"deviceId",
"time",
"log"
],
"where":"time > '2019-09-25'",
"allowFiltering":true
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"deviceId",
"time",
"log"
]
}
}
}
]
}
}
提高同步速度
以cassandra之間的資料同步為例。如下這些配置會對資料同步任務的性能産生影響:
(1)并行度
可以通過調大任務的并行度來提高同步速度。這主要通過job.setting.speed.channel從參數來實作。例如下面這個配置的效果是會有10個線程并行執行同步任務。
"job": {
"setting": {
"speed": {
"channel": 10
}
},
...
需要注意的是,cassandra讀插件裡面,切分任務是通過在cql語句中增加token範圍條件來實作的,是以隻有使用RandomPartitioner和Murmur3Partitioner的叢集才能夠正确切分。如果您的叢集使用了其他的Partitioner,cassandrareader插件會忽略channel配置,隻用一個線程進行同步。
(2)batch
可以通過配置batchSize關鍵字在cassandra寫插件裡面使用UNLOGGED batch來提高寫入速度。但是需要注意cassandra中對batch的使用有一些限制,使用這個關鍵字之前建議先閱讀[《簡析Cassandra的BATCH操作》(
https://yq.aliyun.com/articles/719784?spm=a2c4e.11155435.0.0.65386b04OYOsvK)一文中關于batch使用限制的内容。
(3)連接配接池配置
寫插件還提供了連接配接池相關的配置connectionsPerHost和maxPendingPerConnection。這兩個參數的具體含義可以參考[java driver文檔](
https://docs.datastax.com/en/developer/java-driver/3.7/manual/pooling/)。
(4)一緻性配置
讀寫插件中都提供了consistancyLevel關鍵字,預設的讀寫一緻性級别都是LOCAL_QUORUM。如果您的業務場景裡面可以允許兩個叢集的資料有少量不一緻,也可以考慮不使用預設一緻性級别來提高讀寫性能,例如使用ONE級别來讀資料。
性能資料
我們通過一個測試來觀察datax同步資料的性能。
服務端使用
阿裡雲cassandra,源叢集和目标叢集均為3節點,規格為4CPU 8GB。用戶端使用一台ECS,規則為4 CPU 16 GB。
首先使用cassandra-stress向源叢集寫入500w行資料:
cassandra-stress write cl=QUORUM n=5000000 -schema "replication(factor=3) keyspace=test" -rate "threads=300" -col "n=FIXED(10) size=FIXED(64)" -errors "retries=32" -mode "native cql3 user=$USER password=$PWD" -node "$NODE"
寫入過程的統計資料如下:
然後使用datax将這些資料從源叢集同步到目标叢集。配置檔案如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "<源叢集NODE>",
"port": 9042,
"username":"<USER>",
"password":"<PWD>",
"useSSL": false,
"keyspace": "test",
"table": "standard1",
"column": [
"key",
"\"C0\"",
"\"C1\"",
"\"C2\"",
"\"C3\"",
"\"C4\"",
"\"C5\"",
"\"C6\"",
"\"C7\"",
"\"C8\"",
"\"C9\""
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "<目标叢集NODE>",
"port": 9042,
"username":"<USER>",
"password":"<PWD>",
"useSSL": false,
"keyspace": "test",
"table": "standard1",
"batchSize":6,
"column": [
"key",
"\"C0\"",
"\"C1\"",
"\"C2\"",
"\"C3\"",
"\"C4\"",
"\"C5\"",
"\"C6\"",
"\"C7\"",
"\"C8\"",
"\"C9\""
]
}
}
}
]
}
}
同步過程的統計資料如下:
可見,datax同步資料的性能和cassandra-stress的性能相當,甚至要好一些。
入群邀約
為了營造一個開放的 Cassandra 技術交流,我們建立了微信群公衆号和釘釘群,為廣大使用者提供專業的技術分享及問答,定期開展專家技術直播,歡迎大家加入。
阿裡雲為廣大開發者提供雲上Cassandra資源,可用于動手實踐:9.9元可使用三月(限首購)。
直達連結:
https://www.aliyun.com/product/cds釘釘群入群連結:
https://c.tb.cn/F3.ZRTY0o微信群公衆号: