現在越來越多的技術架構下會組合使用MaxCompute和TableStore,用MaxCompute作大資料分析,計算的結果會導出到TableStore提供線上通路。MaxCompute提供海量資料計算的能力,而TableStore提供海量資料高并發低延遲讀寫的能力。
将 MaxCompute内資料導出至TableStore,目前可選的幾種主要途徑包括:
自己編寫工具:使用MaxCompute SDK通過Tunnel讀取表資料,再通過TableStore SDK再寫入資料。
DataX:自己在伺服器上托管執行DataX任務。
使用資料內建服務:其系統底層也是DataX,額外提供了服務化以及分布式的能力。
其中第二種是我們最常推薦給使用者做臨時的資料導出使用的,如果沒有需要對資料做特殊處理的需求,我們一般不推薦第一種途徑。
DataX在阿裡集團内部已經應用了很多年,經曆了多次雙十一的考驗,是一個穩定、易用、高效的工具。随着MaxCompute上結果資料越來越龐大,資料導出的速率越來越被看重,海量的資料需要在基線内完成導出。本篇文章,主要會介紹幾種優化手段,以提高使用DataX來進行MaxCompute向TableStore資料導出的吞吐量。
我們會以實際的場景,來示範如何通過一步步的優化,提升資料導出的速度。在資料導出的整個鍊路上,主要有三個環節,一是MaxCompute資料通道的讀,二是DataX的資料交換,三是TableStore的線上寫,這三個環節任意一個成為瓶頸,都會影響導出的速度。
MaxCompute資料通道的讀的性能比較高,一般不會成為瓶頸,本文主要是針對後兩個環節來優化。優化的核心指導方針就是:1. 提高并發,2. 降低寫入延遲。接下來列舉的幾種優化手段,也是圍繞這兩點,來不斷進行優化。
實驗選擇使用TableStore的測試環境,在MaxCompute上,我們會建立一張表并準備1億行資料。TableStore的測試環境規模以及DataX Job主控端的規格都較小,是以整個實驗最終達到的速率是比較小的,主要為了示範速率如何提升。而在真實的TableStore生産環境上,規模足夠的情況下,我們幫助過應用優化到每秒上百M甚至上G的速度,優化手段相同。
資料準備
首先在MaxCompute内建立如下表:
其次在表内倒入1億行資料,每行資料約200個位元組,其中userid列采用随機值,計算出的md5值取4個位元組作為md5列,資料樣例如下:
測試資料導入使用的是MaxCompute Tunnel,速度還是比較可觀的。
資料準備完畢後,在TableStore上建立一張表,使用md5和userid作為主鍵列:
表和資料均準備完畢後,使用如下DataX Job配置類進行一次資料導出:
啟動DataX任務,從标準輸出中可以看到目前資料導出的速度:
可以看到,目前的速度大約是1MB/s,接下來會示範如何進行優化,一步一步将速度給提升上去。
一:配置合理的DataX基礎參數
第一步是對DataX的幾個基礎參數進行調優,先大緻了解下一個DataX Job内部,任務的運作結構:
一個DataX Job會切分成多個Task,每個Task會按TaskGroup進行分組,一個Task内部會有一組Reader->Channel->Writer。Channel是連接配接Reader和Writer的資料交換通道,所有的資料都會經由Channel進行傳輸。
在DataX内部對每個Channel會有嚴格的速度控制,預設的速度限制是1MB/s,這也是為何我們使用預設配置,速度為1MB/s的原因。是以第一個需要優化的基礎參數就是單個Channel的速度限制,更改配置如下:
我們把單個Channel的速度上限配置為5MB。這個值需要針對不同的場景進行不同的配置,例如對于MaxCompute,單個Channel的速度可以達到幾十MB,對于TableStore,在列較小較多的場景下,單個Channel的速度是幾MB,而在列較大的場景下,可能速度就會上到幾十MB。
我們目前預設配置中配置啟動的Job内Channel數為1,要提高速度,并發必須提高,這個是第二步要做的優化。但是在做第二個優化之前,還需要調整一個基礎參數,那就是DataX Job啟動的JVM的記憶體大小配置。
目前DataX啟動的JVM預設的配置是"-Xms1g -Xmx1g",當一個Job内Channel數變多後,記憶體的占用會顯著增加,因為DataX作為資料交換通道,在記憶體中會緩存較多的資料,例如Channel中會有一個Buffer,作為臨時的資料交換的緩沖區,而在部分Reader和Writer的中,也會存在一些Buffer。
調整JVM參數的方式有兩種,一種是直接更改datax.py,另一種是在啟動的時候,加上對應的參數,如下:
通常我們建議将記憶體設定為4G或者8G,這個也可以根據實際情況來調整。
在優化完單Channel的限速和JVM的記憶體參數之後,我們重新跑一下任務:
目前資料導出的速度已經從1MB提升到2MB。
二:提升DataX Job内Channel并發
在上一點中指出,目前Job内部,隻有單個Channel在執行導出任務,而要提升速率,要做的就是提升Channel的并發數。
DataX内部對每個Channel會做限速,可以限制每秒byte數,也可以限制每秒record數。除了對每個Channel限速,在全局還會有一個速度限制的配置,預設是不限。
提升Channel并發數有三種途徑:
1, 配置全局Byte限速以及單Channel Byte限速,Channel個數 = 全局Byte限速 / 單Channel Byte限速。(下面示例中最終Channel個數為10)
2,配置全局Record限速以及單Channel Record限速,Channel個數 = 全局Record限速 / 單Channel Record限速。(下面示例中最終Channel個數為3)
3, 全局不限速,直接配置Channel個數。(下面示例中最終Channel個數為5)
第三種方式最簡單直接,但是這樣就缺少了全局的限速。在選擇Channel個數時,同樣需要注意,Channel個數并不是越多越好。Channel個數的增加,帶來的是更多的CPU消耗以及記憶體消耗。如果Channel并發配置過高導緻JVM記憶體不夠用,會出現的情況是發生頻繁的Full GC,導出速度會驟降,适得其反。
可以在DataX的輸出日志中,找到本次任務的Channel的數:
在我們這次實驗中,我們把Channel數直接配置為10,再進行一次導出:
可以看到在Channel數從1提升到10之後,速度從2MB/s提升到了9MB/s。此時若再提高Channel數到15,速度已經不見漲,而從服務端監控看,每批次導入的寫入延遲确在漲,說明目前瓶頸在TableStore寫入端。
三:對TableStore表進行預分區,并進一步提升DataX Channel并發
在上面幾個優化做完後,DataX資料交換這一環節已經不是瓶頸,目前瓶頸在TableStore端的寫入能力上。TableStore是分布式的存儲,一張大表會被切分成很多的分區,分區會分散到後端的各個實體機上提供服務。一張新建立的表,預設分區數為1,當這張表越來越大,TableStore會将其分裂,此時分裂是自動完成的。分區的個數,一定程度上與能提供的服務能力相關。某些業務場景,建立表後,就需要對表進行大規模的資料導入,此時預設的單個分區肯定是不夠用的,當然可以等資料量慢慢漲上來後等表自動分裂,但是這個周期會比較長。此時,我們推薦的做法是在建立表的時候進行預分區。
不過目前我們還沒有對外開放通過SDK來進行預分區的功能,是以如果需要對表進行預分區,可以先通過工單來聯系我們幫助進行預分區。
我們建立一張表,并将表預分4個分區,partition key為md5列,采用md5列的主要原因是在其上資料的分區基本是均勻的。如果資料在partition key分布不均勻,則即使做了預分區,導入性能也不會得到明顯的提升。以相同的Job配置,再跑一下導出任務:
此時速度從9MB/s提升到18MB/s左右,在TableStore服務端能夠提高更多的服務能力後,我們嘗試再将Channel的并發從10提高到15:
此時速度又進一步提升,從18MB/s提升到22MB/s左右。
四:提高每次批量寫行數
我們建構的場景,每行大約是200位元組左右大小。DataX的OTSWriter寫入插件底層是使用的TableStore SDK提供的BatchWrite接口進行資料寫入,預設一次請求寫入100行資料,也就是說一次請求隻會導入約20KB大小的資料。每次寫過來的資料包都比較小,非常的不經濟。
目前TableStore的BatchWrite的限制比較不靈活,會限制行數和資料大小,其中行數預設上限是200行。在每行都比較小的場景下,200行一次批量寫入是非常不經濟的,在我們的這次實驗中,我們将上限改為1000行,并将DataX TableStore寫入插件内部一次批量寫入的行數也改為1000行,來驗證将每次寫入的包變大後,對寫入效率的提升。任務配置更改如下(配置項為job.content.writer.parameter.batchWriteCount):
再次執行任務,速度如下:
速度再次提升,從22MB/s提升到29MB/s。TableStore後續會優化對BatchWrite的行數限制,對于行比較小的場景采用一個比較友好的政策。
五:MaxCompute表分區,提高DataX Job并發
以上優化政策都是在單個DataX Job的場景下進行的優化,單個DataX Job隻能夠運作在單台伺服器上,沒有辦法分布式的執行。D2上的托管伺服器,一般是千兆網卡,也就是說最多提供100MB/s的速度。若想要進一步的速度提升,則必須采用多個DataX Job分布在多台伺服器上執行才行。
DataX内的ODPSReader,可以通過配置一次導出整張表或者表的某個Partition。我們可以利用Partition,來将一張表拆分成多個Job分散導出,但是要求表必須是多分區的。
在我們的實驗中,建立的MaxCompute表并不是多分區的,我們重新建立一張多分區的表:
增加一列為partid,作為分區,我們通過一個SQL将原表的資料導入到新表,并自動均勻的分散到partid:
以上SQL會将partid的值取自md5列的第一個字元,md5是一個十六進制的值,字元的取值範圍是:0-f,這樣我們就将原表切成了一個帶16個分區的表。我們希望在每個分區内,資料都是均勻的,為了避免長尾,這也是為什麼要設計一個md5列的原因。
在将一張表拆成多個分區後,我們就可以選擇在不同的伺服器上,為每個分區啟動一個任務,配置如下(job.content.reader.parameter.partition):
由于測試叢集規模的原因,我們不示範多個Job并發後的速度提升。在TableStore服務端能力不是瓶頸的情況下,通過擴充DataX Job的并發,速度是能線性提升的。
總結下上面的幾個優化點:
對DataX的幾個基本參數進行調整,包括:Channel數、單個Channel的限速以及JVM的記憶體參數。
建立TableStore表的時候盡量采取預分區,在設計partition key的時候盡量保證在每個partition key上導入資料的分布均勻。
如果導入TableStore的資料行都比較小,則需要考慮提高單批次的導入行數。
若單個DataX Job已成瓶頸,則需要考慮将任務拆成多個DataX Job并行執行。
本文作者:晉恒