天天看點

Tablestore+Delta Lake(快速開始)

本文介紹如何在E-MapReduce中通過Tablestore Spark Streaming Source将TableStore中的資料實時導入到Delta Lake中。

背景介紹

近些年來HTAP(Hybrid transaction/analytical processing)的熱度越來越高,通過将存儲和計算組合起來,既能支援傳統的海量結構化資料分析,又能支援快速的事務更新寫入,是設計資料密集型系統的一個成熟的架構。

表格存儲(Tablestore)是阿裡雲自研的 NoSQL 多模型資料庫,提供海量結構化資料存儲以及快速的查詢和分析服務(PB 級存儲、千萬 TPS 以及毫秒級延遲),借助于表格存儲的底層引擎,能夠很好的完成OLTP場景下的需求。Delta Lake類似于支援Delta的Data Lake(資料湖),使用列存來存base資料,行的格式存儲新增delta資料,進而做到支援資料操作的ACID和CRUD,完全相容Spark的大資料生态,通過結合Delta Lake和Spark生态,能夠很好的完成OLAP場景下的需求。下圖展示的是Tablestore和Delta Lake結合的HATP場景的一個簡要的邏輯結構圖,有關結構化大資料分析平台設計的更多細節和幹貨,可以參閱文章 

結構化大資料分析平台設計

Tablestore+Delta Lake(快速開始)

準備工作

步驟一 建立Tablestore源表

詳細開通步驟請參考

官方文檔

,本文demo中所建立出來的表名為Source,表的Schema如下圖所示,該表有PKString和PkInt兩個主鍵,類型分别為String和Interger。

Tablestore+Delta Lake(快速開始)

為表Source建立一個增量通道,如下圖所示,通道清單裡面會顯示該通道的名字、ID以及類型。

Tablestore+Delta Lake(快速開始)

技術注解:

通道服務(Tunnel Service)是基于Tablestore資料接口之上的全增量一體化服務,包含三種通道類型:

  • 全量:對資料表中曆史存量資料消費處理
  • 增量:對資料表中新增資料消費處理
  • 全量加增量:先對資料表總曆史存量資料消費,之後對新增資料消費
通道服務的詳細介紹可查詢 Tablestore官網文檔

步驟二 擷取相關jar包并上傳到hadoop叢集

  • 擷取環境依賴的JAR包。
Jar包 擷取方法

emr-tablestore-X.X.X.jar

X.X.X: Since 1.9.0+

Maven 庫中下載下傳: https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar 下載下傳 EMR SDK 相關的Tablestore依賴包。 https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar
  • 在叢集管理頁面,單擊已建立的Hadoop叢集的叢集ID ,進入叢集與服務管理頁面。
  • 在左側導航樹中選擇主機清單,然後在右側檢視Hadoop叢集中emr-header-1主機的IP資訊。
  • 在SSH用戶端中建立一個指令視窗,登入Hadoop叢集的emr-header-1主機。
  • 上傳所有JAR包到emr-header-1節點的某個目錄下。

步驟三 運作Spark Streaming作業

  1. 以一個基于 emr demo 修改的代碼為樣例,編譯生成JAR包,JAR包需要上傳到Hadoop叢集的emr-header-1主機中(參見步驟二),完整的代碼由于改動較大,不在本文中一一說明,後續會合到emr demo官方項目中。
  2. 該樣例以Tablestore表作為資料源,通過結合Tablestore CDC技術,Tablestore Streaming Source和Delta Sink,示範的是TableStore到Delta Lake的一個完整鍊路。
  3. 按以下指令,啟動spark streaming作業,開啟一個實時同步Tablestore Source表中資料到Delta Lake Table的監聽程式。
spark-submit --class com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>           

各個參數說明如下:

參數 參數說明
com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC 所要運作的主程式類
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
EMR SDK 相關的Tablestore依賴包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程式類)
instance Tablestore執行個體名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘鑰
endPoint Tablestore執行個體的endPoint
maxOffsetsPerChannel Tablestore通道  Channel 在每個Spark Batch周期内同步的最大資料條數,預設10000。
catalog 同步的列名,詳見 Catalog字段說明

步驟四 資料CRUD示例

  1. 首先在TableStore裡插入兩行,本次示例中,我們建了8列的同步列,包括兩個主鍵(PkString, PkInt)和六個屬性列(col1, col2, col3, timestamp, col5和col6)。由于表格存儲是Free-Schema的結構,我們可以任意的插入屬性列,TableStore的Spark Source會自動的做屬性列的篩選。如下面兩張圖所示,在插入兩行資料後,Delta Table中同步也可以馬上讀取到兩行,且資料一緻。
    Tablestore+Delta Lake(快速開始)
    Tablestore+Delta Lake(快速開始)
  2. 接着,在Tablestore中進行一些更新行和插入行的操作,如下面的兩個圖所示,等待一小段micro-batch的資料同步後,表格存儲中的資料同步變化能夠即時的更新到Delta Table中。
    Tablestore+Delta Lake(快速開始)
    Tablestore+Delta Lake(快速開始)
  3. 将Tablestore中的資料全部清空,如下面兩圖所示,Delta Table也同步的變成了空。
    Tablestore+Delta Lake(快速開始)
    Tablestore+Delta Lake(快速開始)
  4. 在叢集上,Delta Table預設存放在HDFS中,如下圖所示,_delta_log目錄中存放的json檔案是Transaction log,parquet格式的檔案是底層的資料檔案。
    Tablestore+Delta Lake(快速開始)

寫在最後

本文介紹了如何在E-MapReduce中通過Tablestore Spark Streaming Source将TableStore中的資料實時導入到Delta Lake中,如果對基于Tablestore的大資料存儲分析感興趣的朋友可以加入我們的技術交流群(釘釘:23307953 或者11789671),來與我們一起探讨。

Tablestore+Delta Lake(快速開始)