作者
陳鑫偉(熙康),阿裡雲 計算平台事業部 技術專家
馮加亮(加亮),阿裡雲 計算平台事業部 技術研發
背景
随着資料時代的不斷發展,資料量爆發式增長,資料形式也變的更加多樣。傳統資料倉庫模式的成本高、響應慢、格式少等問題日益凸顯。于是擁有成本更低、資料形式更豐富、分析計算更靈活的資料湖應運而生。
資料湖作為一個集中化的資料存儲倉庫,支援的資料類型具有多樣性,包括結構化、半結構化以及非結構化的資料,資料來源上包含資料庫資料、binglog 增量資料、日志資料以及已有數倉上的存量資料等。資料湖能夠将這些不同來源、不同格式的資料集中存儲管理在高成本效益的存儲如 OSS 等對象存儲中,并對外提供統一的資料目錄,支援多種計算分析方式,有效解決了企業中面臨的資料孤島問題,同時大大降低了企業存儲和使用資料的成本。
資料湖架構及關鍵技術
企業級資料湖架構如下:
資料湖存儲與格式
資料湖存儲主要以雲上對象存儲作為主要媒體,其具有低成本、高穩定性、高可擴充性等優點。
資料湖上我們可以采用支援 ACID 的資料湖存儲格式,如 Delta Lake、Hudi、Iceberg。這些資料湖格式有自己的資料 meta 管理能力,能夠支援 Update、Delete 等操作,以批流一體的方式解決了大資料場景下資料實時更新的問題。在目前方案中,我們主要介紹Delta Lake的核心能力和應用場景。
Delta Lake 的核心能力
Delta Lake 是一個統一的資料管理系統,為雲上資料湖帶來資料可靠性和快速分析。Delta Lake 運作在現有資料湖之上,并且與 Apache Spark 的 API 完全相容。使用Delta Lake,您可以加快高品質資料導入資料湖的速度,團隊也可以在雲服務上快速使用這些資料,安全且可擴充。
- ACID 事務性:Delta Lake 在多個寫操作之間提供 ACID 事務性。每一次寫操作都是一個事務操作,事務日志(Transaction Log)中記錄的寫操作都有一個順序序列。事務日志(Transaction Log)跟蹤了檔案級别的寫操作,并使用了樂觀鎖進行并發控制,這非常适用于資料湖,因為嘗試修改相同檔案的多次寫操作的情況并不經常發生。當發生沖突時,Delta Lake 會抛出一個并發修改異常,抛給供使用者處理并重試其作業。Delta Lake 還提供了最進階别的隔離(可序列化隔離),允許工程師不斷地向目錄或表寫入資料,而使用者不斷地從同一目錄或表讀取資料,讀取資料時會看到資料的最新快照。
- Schema 管理(Schema management):Delta Lake 會自動驗證正在寫入的DataFrame 的 Schema 是否與表的 Schema 相容。若表中存在但 DataFrame 中不存在的列則會被設定為 null。如果 DataFrame 中有額外的列不在表中,那麼該操作将會抛出異常。Delta Lake 具有 DDL(資料定義語言)顯式添加新列的功能,并且能夠自動更新 Schema。
- 可伸縮的中繼資料(Metadata)處理:Delta Lake 将表或目錄的中繼資料資訊存儲在事務日志(Transaction Log)中,而不是中繼資料 Metastore 中。這使得 Delta Lake夠在固定時間内列出大目錄中的檔案,并且在讀取資料時效率很高。
- 資料版本控制和時間旅行(Time Travel):Delta Lake 允許使用者讀取表或目錄的曆史版本快照。當檔案在寫入過程中被修改時,Delta Lake 會建立檔案的新的版本并保留舊版本。當使用者想要讀取表或目錄的較舊版本時,他們可以向 Apach Spark的 read API 提供時間戳或版本号,Delta Lake 根據事務日志(Transaction Log)中的資訊來建構該時間戳或版本的完整快照。這非常友善使用者來複現實驗和報告,如果需要,還可以将表還原為舊版本。
- 統一批流一體:除了批處理寫入之外,Delta Lake 還可以作為 Apache Spark 的結構化流的高效流接收器(Streaming Sink)。與 ACID 事務和可伸縮中繼資料處理相結合,高效的流接收器(Streaming Sink)支援大量近實時的分析用例,而無需維護複雜的流和批處理管道。
- 記錄更新和删除:Delta Lake 将支援合并、更新和删除的 DML(資料管理語言)指令。這使得工程師可以輕松地在資料湖中插入和删除記錄,并簡化他們的變更資料捕獲和 GDPR(一般資料保護條例)用例。由于 Delta Lake 在檔案級粒度上進行跟蹤和修改資料,是以它比讀取和覆寫整個分區或表要高效得多。
資料湖建構與管理
1. 資料入湖
企業的原始資料存在于多種資料庫或存儲系統,如關系資料庫 MySQL、日志系統SLS、NoSQL 存儲 HBase、消息資料庫 Kafka 等。其中大部分的線上存儲都面向線上事務型業務,并不适合線上分析的場景,是以需要将資料以無侵入的方式同步至成本更低且更适合計算分析的對象存儲。
常用的資料同步方式有基于 DataX、Sqoop 等資料同步工具做批量同步;同時在對于實時性要求較高的場景下,配合使用 Kafka+spark Streaming / flink 等流式同步鍊路。目前很多雲廠商提供了一站式入湖的解決方案,幫助客戶以更快捷更低成本的方式實作資料入湖,如阿裡雲 DLF 資料入湖。
2. 統一進制資料服務
對象存儲本身是沒有面向大資料分析的語義的,需要結合 Hive Metastore Service 等中繼資料服務為上層各種分析引擎提供資料的 Meta 資訊。資料湖中繼資料服務的設計目标是能夠在大資料引擎、存儲多樣性的環境下,建構不同存儲系統、格式和不同計算引擎統一進制資料視圖,并具備統一的權限、中繼資料,且需要相容和擴充開源大資料生态中繼資料服務,支援自動擷取中繼資料,并達到一次管理多次使用的目的,這樣既能夠相容開源生态,也具備極大的易用性。
資料湖計算與分析
相比于資料倉庫,資料湖以更開放的方式對接多種不同的計算引擎,如傳統開源大資料計算引擎 Hive、Spark、Presto、Flink 等,同時也支援雲廠商自研的大資料引擎,如阿裡雲 MaxCompute、Hologres 等。在資料湖存儲與計算引擎之間,一般還會提供資料湖加速的服務,以提高計算分析的性能,同時減少帶寬的成本和壓力。
Databricks 資料洞察-商業版的 Spark 資料計算與分析引擎
DataBricks 資料洞察(DDI)做為阿裡雲上全托管的 Spark 分析引擎,能夠簡單快速幫助使用者對資料湖的資料進行計算與分析。
- Saas 全托管 Spark:免運維,無需關注底層資源情況,降低運維成本,聚焦分析業務
- 完整 Spark 技術棧內建:一站式內建 Spark 引擎和 Delta Lake 資料湖,100%相容開源 Spark 社群版;Databricks 做商業支援,最快體驗 Spark 最新版本特性
- 總成本降低:商業版本 Spark 及 Delta Lake 性能優勢顯著;同時基于計算存儲分離架構,存儲依托阿裡雲 OSS 對象存儲,借助阿裡雲 JindoFS 緩存層加速;能夠有效降低叢集總體使用成本
- 高品質支援以及 SLA 保障:阿裡雲和 Databricks 提供覆寫 Spark 全棧的技術支援;提供商業化 SLA 保障與7*24小時 Databricks 專家支援服務
Databricks 資料洞察+ DLF 資料湖建構與流批一體分析實踐
企業建構和應用資料湖一般需要經曆資料入湖、資料湖存儲與管理、資料湖探索與分析等幾個過程。本文主要介紹基于阿裡雲資料湖建構(DLF)+Databricks 資料洞察(DDI)建構一站式的資料入湖,批流一體資料分析實戰。
流處理場景:
實時場景維護更新兩張 Delta 表:
- delta_aggregates_func 表:RDS 資料實時入湖 。
- delta_aggregates_metrics 表:工業 metric 資料通過 IoT 平台采集到雲 Kafka ,經由 Spark Structured Streaming 實時入湖。
批處理場景:
以實時場景生成兩張 Delta 作為資料源,進行資料分析執行 Spark jobs,通過 Databrick 資料洞察作業排程定時執行。
前置條件
1. 服務開通
確定 DLF、OSS、Kafka、DDI、RDS、DTS 等雲産品服務已開通。注意 DLF、RDS、Kafka、DDI 執行個體均需在同一 Region 下。
2. RDS 資料準備
RDS 資料準備,在 RDS 中建立資料庫 dlfdb。在賬戶中心建立能夠讀取 engine_funcs資料庫的使用者賬号,如 dlf_admin。
通過 DMS 登入資料庫,運作一下語句建立 engine_funcs 表,及插入少量資料。
CREATE TABLE `engine_funcs` ( `emp_no` int(11) NOT NULL,
`engine_serial_number` varchar(20) NOT NULL,
`engine_serial_name` varchar(20) NOT NULL,
`target_engine_serial_number` varchar(20) NOT NULL,
`target_engine_serial_name` varchar(20) NOT NULL,
`operator` varchar(16) NOT NULL,
`create_time` DATETIME NOT NULL,
`update_time` DATETIME NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
INSERT INTO `engine_funcs` VALUES (10001,'1107108133','temperature','1107108144','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10002,'1107108155','temperature','1107108133','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10003,'1107108155','runTime','1107108166','speed','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10004,'1107108177','pressure','1107108155','electricity','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10005,'1107108188','flow' ,'1107108111','runTime','/', now(), now());
RDS資料實時入湖
1. 建立資料源
- 進入 DLF 控制台界面: https://dlf.console.aliyun.com/cn-hangzhou/home ,點選菜單 資料入湖 -> 資料源管理。
- 點選 建立資料源。填寫連接配接名稱,選擇資料準備中的使用的 RDS 執行個體,填寫賬号密碼,點選“連接配接測試”驗證網絡連通性及賬号可用性。
- 點選下一步,确定,完成資料源建立。
2. 建立中繼資料庫
在 OSS 中建立 Bucket,databricks-data-source;
點選左側菜單“中繼資料管理”->“中繼資料庫”,點選“建立中繼資料庫”。填寫名稱,建立目錄 dlf/,并選擇。
3. 建立入湖任務
- 點選菜單“資料入湖”->“入湖任務管理”,點選“建立入湖任務”。
- 選擇“關系資料庫實時入湖”,按照下圖的資訊填寫資料源、目标資料湖、任務配置等資訊。并儲存。
- 配置資料源,選擇剛才建立的“dlf”連接配接,使用表路徑 “dlf/engine_funcs”,選擇建立 dts 訂閱,填寫名稱。
- 回到任務管理頁面,點選“運作”建立的入湖任務。就會看到任務進入“初始化中”狀态,随後會進入“運作”狀态。
- 點選“詳情”進入任務詳情頁,可以看到相應的資料庫表資訊。
該資料入湖任務,屬于全量+增量入湖,大約3至5分鐘後,全量資料會完成導入,随後自動進入實時監聽狀态。如果有資料更新,則會自動更新至 Delta Lake 資料中。
資料湖探索與分析
DLF 資料查詢探索
DLF 産品提供了輕量級的資料預覽和探索功能,點選菜單“資料探索”->“SQL 查詢”進入資料查詢頁面。
在中繼資料庫表中,找到“fjl_dlf”,展開後可以看到 engine_funcs_delta 表已經自動建立完成。輕按兩下該表名稱,右側 sql 編輯框會出現查詢該表的 sql 語句,點選“運作”,即可獲得資料查詢結果。
回到 DMS 控制台,運作下方 DELETE 和 INSERT SQL 語句。
DELETE FROM `engine_funcs` where `emp_no` = 10001;
UPDATE `engine_funcs` SET `operator` = '+', `update_time` = NOW() WHERE `emp_no` =10002;
INSERT INTO `engine_funcs` VALUES (20001,'1107108199','speed','1107108122','runTime','*', now(), now());
大約1至3分鐘後,在 DLF 資料探索再次執行剛才的 select 語句,所有的資料更新已經同步至資料湖中。
建立 Databricks 資料洞察(DDI)叢集
- 叢集建立完成後,點選“詳情”進入詳情頁,添加目前通路機器 ip 白名單。
- 點選 Notebook 進入互動式分析頁查詢同步至 Delta Lake 中 engine_funcs_delta 表資料。
IoT 平台采集到雲 Kafka 資料實時寫入 Delta Lake
1.引入 spark-sql-kafka 三方依賴
%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
2.使用 UDF 函數定義流資料寫入 Delta Lake 的 Merge 規則
發往 Kafka 的測試資料的格式:
{"sn": "1107108111","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"380","flow":"740","dia":"330"}
{"sn": "1107108122","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"390","flow":"787","dia":"340"}
{"sn": "1107108133","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"377","flow":"777","dia":"345"}
{"sn": "1107108144","temperature": "15" ,"speed":"1315", "runTime":"145","pressure":"240","electricity":"367","flow":"730","dia":"430"}
{"sn": "1107108155","temperature": "16" ,"speed":"1415", "runTime":"155","pressure":"250","electricity":"383","flow":"750","dia":"345"}
{"sn": "1107108166","temperature": "10" ,"speed":"1515", "runTime":"145","pressure":"260","electricity":"350","flow":"734","dia":"365"}
{"sn": "1107108177","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"377","flow":"733","dia":"330"}
{"sn": "1107108188","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"381","flow":"737","dia":"340"}
{"sn": "1107108199","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"378","flow":"747","dia":"345"}
%spark
import org.apache.spark.sql._
import io.delta.tables._
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("dataStream")
// 對流資料DF執行列轉行的操作;
val df=microBatchOutputDF.sparkSession.sql(s"""
select `sn`,
stack(7, 'temperature', `temperature`, 'speed', `speed`, 'runTime', `runTime`, 'pressure', `pressure`, 'electricity', `electricity`, 'flow', `flow` , 'dia', `dia`) as (`name`, `value` )
from dataStream
""")
df.createOrReplaceTempView("updates")
// 實作實時更新動态的資料,結果merge到表裡面
val mergedf=df.sparkSession.sql(s"""
MERGE INTO delta_aggregates_metrics t
USING updates s
ON s.sn = t.sn and s.name=t.name
WHEN MATCHED THEN UPDATE SET
t.value = s.value,
t.update_time=current_timestamp()
WHEN NOT MATCHED THEN INSERT
(t.sn,t.name,t.value ,t.create_time,t.update_time)
values (s.sn,s.name,s.value,current_timestamp(),current_timestamp())
""")
}
3.使用 Spark Structured Streaming 實時流寫入 Delta Lake
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
def getquery(checkpoint_dir:String,servers:String,topic:String ){
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
var streamingSelectDF =
streamingInputDF
.select(
get_json_object(($"value").cast("string"), "$.sn").alias("sn"),
get_json_object(($"value").cast("string"), "$.temperature").alias("temperature"),
get_json_object(($"value").cast("string"), "$.speed").alias("speed"),
get_json_object(($"value").cast("string"), "$.runTime").alias("runTime"),
get_json_object(($"value").cast("string"), "$.electricity").alias("electricity"),
get_json_object(($"value").cast("string"), "$.flow").alias("flow"),
get_json_object(($"value").cast("string"), "$.dia").alias("dia"),
get_json_object(($"value").cast("string"), "$.pressure").alias("pressure")
)
val query = streamingSelectDF
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_dir)
.trigger(Trigger.ProcessingTime("5 seconds")) // 執行流處理時間間隔
.foreachBatch(upsertToDelta _) //引用upsertToDelta函數
.outputMode("update")
.start()
}
4. 執行程式
%spark
val my_checkpoint_dir="oss://databricks-data-source/checkpoint/ck"
val servers= "***.***.***.***:9092"
val topic= "your-topic"
getquery(my_checkpoint_dir,servers,topic)
5. 啟動 Kafka 并向生産裡發送測試資料
- 查詢資料實時寫入并更新
- 查詢從 MySQL 實時同步入湖的 engine_funcs_delta 資料
%spark
val rds_dataV=spark.table("fjl_dlf.engine_funcs_delta")
rds_dataV.show()
批處理作業
結合業務,需要将對應的 delta_aggregates_metrics 裡的 Value 參數 join 到engine_funcs_delta 表裡
%spark
//讀取實時更新的delta_aggregates_metrics資料表
val aggregateDF=spark.table("log_data_warehouse_dlf.delta_aggregates_metrics")
//讀取實時更新的engine_funcs_delta函數表
val rds_dataV=spark.table("fjl_dlf.engine_funcs_delta").drop("create_time","update_time")
// rds_dataV.show()
val aggregateSDF= aggregateDF.withColumnRenamed("value","esn_value").withColumnRenamed("name","engine_serial_name").withColumnRenamed("sn","engine_serial_number")
// aggregateSDF.show()
val aggregateTDF=aggregateDF.withColumnRenamed("value","tesn_value").withColumnRenamed("name","target_engine_serial_name").withColumnRenamed("sn","target_engine_serial_number").drop("create_time","update_time")
// aggregateTDF.show()
//将對應的delta_aggregates_metrics裡的Value參數 join到engine_funcs_delta表裡;
val resdf=rds_dataV.join(aggregateSDF,Seq("engine_serial_name","engine_serial_number"),"left").join(aggregateTDF,Seq("target_engine_serial_number","target_engine_serial_name"),"left")
.selectExpr("engine_serial_number","engine_serial_name","esn_value","target_engine_serial_number","target_engine_serial_name","tesn_value","operator","create_time","update_time")
//資料展示
resdf.show(false)
// 将結果寫入到Delta表裡面
resdf.write.format("delta")
.mode("append")
.saveAsTable("log_data_warehouse_dlf.delta_result")
性能優化:OPTIMIZE & Z-Ordering
在流處理場景下會産生大量的小檔案,大量小檔案的存在會嚴重影響資料系統的讀性能。Delta Lake 提供了 OPTIMIZE 指令,可以将小檔案進行合并壓縮,另外,針對 Ad-Hoc 查詢場景,由于涉及對單表多個次元資料的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的性能。進而極大提升讀取表的性能。DeltaLake 本身提供了 Auto Optimize 選項,但是會犧牲少量寫性能,增加資料寫入 delta 表的延遲。相反,執行 OPTIMIZE 指令并不會影響寫的性能,因為 Delta Lake 本身支援 MVCC,支援 OPTIMIZE 的同時并發執行寫操作。是以,我們采用定期觸發執行 OPTIMIZE 的方案,每小時通過 OPTIMIZE 做一次合并小檔案操作,同時執行 VACCUM 來清理過期資料檔案:
OPTIMIZE log_data_warehouse_dlf.delta_result ZORDER by engine_serial_number;
VACUUM log_data_warehouse_dlf.delta_result RETAIN 1 HOURS;
相關文獻
一、Delta Lake 基礎和性能介紹:
二、Delta Lake 優化:
- https://databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html
- 文檔: https://help.aliyun.com/document_detail/148379.html?utm_content=g_1000230851&spm=5176.20966629.toubu.3.f2991ddcpxxvD1#title-kbl-4a7-npr
擷取更詳細的 Databricks 資料洞察相關資訊,可至産品詳情頁檢視:
https://www.aliyun.com/product/bigdata/spark擷取更詳細的 資料湖建構DLF相關資訊,可至産品詳情頁檢視:
https://www.aliyun.com/product/bigdata/dlf掃描下方二維碼入 Databricks 資料洞察産品交流釘釘群一起參與交流讨論!