本文介紹如何在E-MapReduce中通過Tablestore Spark Streaming Source将TableStore中的資料實時導入到Delta Lake中。
背景介紹
近些年來HTAP(Hybrid transaction/analytical processing)的熱度越來越高,通過将存儲和計算組合起來,既能支援傳統的海量結構化資料分析,又能支援快速的事務更新寫入,是設計資料密集型系統的一個成熟的架構。
表格存儲(Tablestore)是阿裡雲自研的 NoSQL 多模型資料庫,提供海量結構化資料存儲以及快速的查詢和分析服務(PB 級存儲、千萬 TPS 以及毫秒級延遲),借助于表格存儲的底層引擎,能夠很好的完成OLTP場景下的需求。Delta Lake類似于支援Delta的Data Lake(資料湖),使用列存來存base資料,行的格式存儲新增delta資料,進而做到支援資料操作的ACID和CRUD,完全相容Spark的大資料生态,通過結合Delta Lake和Spark生态,能夠很好的完成OLAP場景下的需求。下圖展示的是Tablestore和Delta Lake結合的HATP場景的一個簡要的邏輯結構圖,有關結構化大資料分析平台設計的更多細節和幹貨,可以參閱文章
結構化大資料分析平台設計。
準備工作
- 登入 阿裡雲E-MapReduce控制台
- 建立Hadoop叢集 (若已建立,請跳過)
- 確定将Tablestore執行個體部署在E-MapReduce叢集相同的VPC環境下
步驟一 建立Tablestore源表
詳細開通步驟請參考
官方文檔,本文demo中所建立出來的表名為Source,表的Schema如下圖所示,該表有PKString和PkInt兩個主鍵,類型分别為String和Interger。
為表Source建立一個增量通道,如下圖所示,通道清單裡面會顯示該通道的名字、ID以及類型。
技術注解:
通道服務(Tunnel Service)是基于Tablestore資料接口之上的全增量一體化服務,包含三種通道類型:
通道服務的詳細介紹可查詢 Tablestore官網文檔
- 全量:對資料表中曆史存量資料消費處理
- 增量:對資料表中新增資料消費處理
- 全量加增量:先對資料表總曆史存量資料消費,之後對新增資料消費
步驟二 擷取相關jar包并上傳到hadoop叢集
- 擷取環境依賴的JAR包。
Jar包 | 擷取方法 |
---|---|
emr-tablestore-X.X.X.jar X.X.X: Since 1.9.0+ | Maven 庫中下載下傳: https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore |
tablestore-X.X.X-jar-with-dependencies.jar | 下載下傳 EMR SDK 相關的Tablestore依賴包。 https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar |
- 在叢集管理頁面,單擊已建立的Hadoop叢集的叢集ID ,進入叢集與服務管理頁面。
- 在左側導航樹中選擇主機清單,然後在右側檢視Hadoop叢集中emr-header-1主機的IP資訊。
- 在SSH用戶端中建立一個指令視窗,登入Hadoop叢集的emr-header-1主機。
- 上傳所有JAR包到emr-header-1節點的某個目錄下。
步驟三 運作Spark Streaming作業
- 以一個基于 emr demo 修改的代碼為樣例,編譯生成JAR包,JAR包需要上傳到Hadoop叢集的emr-header-1主機中(參見步驟二),完整的代碼由于改動較大,不在本文中一一說明,後續會合到emr demo官方項目中。
- 該樣例以Tablestore表作為資料源,通過結合Tablestore CDC技術,Tablestore Streaming Source和Delta Sink,示範的是TableStore到Delta Lake的一個完整鍊路。
- 按以下指令,啟動spark streaming作業,開啟一個實時同步Tablestore Source表中資料到Delta Lake Table的監聽程式。
spark-submit --class com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
各個參數說明如下:
參數 | 參數說明 |
---|---|
com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC | 所要運作的主程式類 |
emr-tablestore-X.X.X-SNAPSHOT.jar | 包含Tablestore source的jar包 |
EMR SDK 相關的Tablestore依賴包 | |
examples-X.X.X-shaded.jar | 基于EMR demo修改的包(包含主程式類) |
instance | Tablestore執行個體名 |
tableName | Tablestore表名 |
tunnelId | Tablestore表的通道Id |
accessKeyId | Tablestore的accessKeyId |
accessKeySecret | Tablestore的秘鑰 |
endPoint | Tablestore執行個體的endPoint |
maxOffsetsPerChannel | Tablestore通道 Channel 在每個Spark Batch周期内同步的最大資料條數,預設10000。 |
catalog | 同步的列名,詳見 Catalog字段說明 |
步驟四 資料CRUD示例
- 首先在TableStore裡插入兩行,本次示例中,我們建了8列的同步列,包括兩個主鍵(PkString, PkInt)和六個屬性列(col1, col2, col3, timestamp, col5和col6)。由于表格存儲是Free-Schema的結構,我們可以任意的插入屬性列,TableStore的Spark Source會自動的做屬性列的篩選。如下面兩張圖所示,在插入兩行資料後,Delta Table中同步也可以馬上讀取到兩行,且資料一緻。
- 接着,在Tablestore中進行一些更新行和插入行的操作,如下面的兩個圖所示,等待一小段micro-batch的資料同步後,表格存儲中的資料同步變化能夠即時的更新到Delta Table中。
- 将Tablestore中的資料全部清空,如下面兩圖所示,Delta Table也同步的變成了空。
- 在叢集上,Delta Table預設存放在HDFS中,如下圖所示,_delta_log目錄中存放的json檔案是Transaction log,parquet格式的檔案是底層的資料檔案。
寫在最後
本文介紹了如何在E-MapReduce中通過Tablestore Spark Streaming Source将TableStore中的資料實時導入到Delta Lake中,如果對基于Tablestore的大資料存儲分析感興趣的朋友可以加入我們的技術交流群(釘釘:23307953 或者11789671),來與我們一起探讨。