Apache Spark Delta Lake 事務日志實作源碼分析
我們已經在
這篇文章詳細介紹了 Apache Spark Delta Lake 的事務日志是什麼、主要用途以及如何工作的。那篇文章已經可以很好地給大家介紹 Delta Lake 的内部工作原理,原子性保證,本文為了學習的目的,帶領大家從源碼級别來看看 Delta Lake 事務日志的實作。在看本文時,強烈建議先看一下
《深入了解 Apache Spark Delta Lake 的事務日志》文章。
Delta Lake 更新資料事務實作
Delta Lake 裡面所有對表資料的更新(插入資料、更新資料、删除資料)都需要進行下面這些步驟,其主要目的是把删除哪些檔案、新增哪些檔案等記錄寫入到事務日志裡面,也就是 _delta_log 目錄下的 json 檔案,通過這個實作 Delta Lake 的 ACID 以及時間旅行。下面我們進入事務日志送出的切入口
org.apache.spark.sql.delta.OptimisticTransaction#commit
,持久化事務記錄檔都是需要調用這個函數進行的。commit 函數實作如下:
def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
deltaLog,
"delta.commit") {
val version = try {
// 事務日志送出之前需要先做一些工作,比如如果更新操作是第一次進行的,那麼需要初始化 Protocol,
// 還需要将使用者對 Delta Lake 表的設定持久化到事務日志裡面
var finalActions = prepareCommit(actions, op)
// 如果這次更新操作需要删除之前的檔案,那麼 isBlindAppend 為 false,否則為 true
val isBlindAppend = {
val onlyAddFiles =
finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
onlyAddFiles && !dependsOnFiles
}
// 如果 commitInfo.enabled 參數設定為 true,那麼還需要把 commitInfo 記錄到事務日志裡面
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {
commitInfo = CommitInfo(
clock.getTimeMillis(),
op.name,
op.jsonEncodedValues,
Map.empty,
Some(readVersion).filter(_ >= 0),
None,
Some(isBlindAppend))
finalActions = commitInfo +: finalActions
}
// 真正寫事務日志,如果發生版本沖突會重試直到事務日志寫成功
val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
// 對事務日志進行 checkpoint 操作
postCommit(commitVersion, finalActions)
commitVersion
} catch {
case e: DeltaConcurrentModificationException =>
recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
throw e
case NonFatal(e) =>
recordDeltaEvent(
deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
throw e
}
version
}
我們先從這個函數的兩個參數開始介紹。
- _actions: Seq[Action]_:Delta Lake 表更新操作産生的新檔案(AddFile)和需要删除檔案的清單(RemoveFile)。如果是 Structured Streaming 作業,還會記錄 SetTransaction 記錄,裡面會存儲作業的 query id(sql.streaming.queryId)、batchId 以及目前時間。這個就是我們需要持久化到事務日志裡面的資料。
- _op: DeltaOperations.Operation_:Delta 操作類型,比如 WRITE、STREAMING UPDATE、DELETE、MERGE 以及 UPDATE 等。
在 commit 函數裡面主要分為三步:prepareCommit、doCommit 以及 postCommit。prepareCommit 的實作如下:
protected def prepareCommit(
actions: Seq[Action],
op: DeltaOperations.Operation): Seq[Action] = {
assert(!committed, "Transaction already committed.")
// 如果我們更新了表的 Metadata 資訊,那麼需要将其寫入到事務日志裡面
var finalActions = newMetadata.toSeq ++ actions
val metadataChanges = finalActions.collect { case m: Metadata => m }
assert(
metadataChanges.length <= 1,
"Cannot change the metadata more than once in a transaction.")
metadataChanges.foreach(m => verifyNewMetadata(m))
// 首次送出事務日志,那麼會確定 _delta_log 目錄要存在,
// 然後檢查 finalActions 裡面是否有 Protocol,沒有的話需要初始化協定版本
if (snapshot.version == -1) {
deltaLog.ensureLogDirectoryExist()
if (!finalActions.exists(_.isInstanceOf[Protocol])) {
finalActions = Protocol() +: finalActions
}
}
finalActions = finalActions.map {
// 第一次送出,并且是 Metadata那麼會将 Delta Lake 的配置資訊加入到 Metadata 裡面
case m: Metadata if snapshot.version == -1 =>
val updatedConf = DeltaConfigs.mergeGlobalConfigs(
spark.sessionState.conf, m.configuration, Protocol())
m.copy(configuration = updatedConf)
case other => other
}
deltaLog.protocolWrite(
snapshot.protocol,
logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol]))
// 如果 actions 裡面有删除的檔案,那麼需要檢查 Delta Lake 是否支援删除
val removes = actions.collect { case r: RemoveFile => r }
if (removes.exists(_.dataChange)) deltaLog.assertRemovable()
finalActions
}
prepareCommit
裡面做的事情比較簡單,主要對事務日志進行補全等操作。具體為
- 、由于 Delta Lake 表允許對已經存在的表模式進行修改,比如添加了新列,或者覆寫原有表的模式等。那麼這時候我們需要将新的 Metadata 寫入到事務日志裡面。Metadata 裡面存儲了表的 schema、分區列、表的配置、表的建立時間。注意,除了表的 schema 和分區字段可以在後面修改,其他的資訊都不可以修改的。
- 、如果是首次送出事務日志,那麼先檢查表的 _delta_log 目錄是否存在,不存在則建立。然後檢查是否設定了協定的版本,如果沒有設定,則使用預設的協定版本,預設的協定版本中 readerVersion = 1,writerVersion = 2;
- 、如果是第一次送出,并且是 Metadata ,那麼會将 Delta Lake 的配置資訊加入到 Metadata 裡面。Delta Lake 表的配置資訊主要是在
類裡面定義的,比如我們可以在建立 Delta Lake 表的時候指定多久做一次 Checkpoint。org.apache.spark.sql.delta.sources.DeltaSQLConf
- 、由于我們可以通過
參數将表設定為僅允許追加,是以如果當 actions 裡面存在 RemoveFile,那麼我們需要判斷表是否允許删除。spark.databricks.delta.properties.defaults.appendOnly
我們回到
commit
函數裡面,在執行完
prepareCommit
之後得到了 finalActions 清單,這些資訊就是我們需要寫入到事務日志裡面的資料。緊接着會判斷這次事務變更是否需要删除之前的檔案,如果是,那麼 isBlindAppend 為 false,否則為 true。
當
commitInfo.enabled
參數設定為 true(預設),那麼還需要将 commitInfo 寫入到事務日志檔案裡面。CommitInfo 裡面包含了操作時間、操作的類型(WRITEUPDATE)、操作類型(Overwrite)等重要資訊。最後到了
doCommit
函數的調用,大家注意看第一個參數傳遞的是
snapshot.version + 1
,
snapshot.version
是事務日志中最新的版本,比如 _delta_lake 目錄下的檔案如下:
-rw-r--r-- 1 yangping.wyp wheel 811B 8 28 19:12 00000000000000000000.json
-rw-r--r-- 1 yangping.wyp wheel 514B 8 28 19:14 00000000000000000001.json
-rw-r--r-- 1 yangping.wyp wheel 711B 8 29 10:54 00000000000000000002.json
-rw-r--r-- 1 yangping.wyp wheel 865B 8 29 10:56 00000000000000000003.json
那麼
snapshot.version
的值就是3,是以這次更新操作的版本應該是4。我們來看下 doCommit 函數的實作:
private def doCommit(
attemptVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long = deltaLog.lockInterruptibly {
try {
logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")
// 真正寫事務日志的操作
deltaLog.store.write(
deltaFile(deltaLog.logPath, attemptVersion),
actions.map(_.json).toIterator)
val commitTime = System.nanoTime()
// 由于發生了資料更新,是以更新記憶體中事務日志的最新快照,并做相關判斷
val postCommitSnapshot = deltaLog.update()
if (postCommitSnapshot.version < attemptVersion) {
throw new IllegalStateException(
s"The committed version is $attemptVersion " +
s"but the current version is ${postCommitSnapshot.version}.")
}
// 發送一些統計資訊
var numAbsolutePaths = 0
var pathHolder: Path = null
val distinctPartitions = new mutable.HashSet[Map[String, String]]
val adds = actions.collect {
case a: AddFile =>
pathHolder = new Path(new URI(a.path))
if (pathHolder.isAbsolute) numAbsolutePaths += 1
distinctPartitions += a.partitionValues
a
}
val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
readVersion = postCommitSnapshot.version,
txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano),
commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano),
numAdd = adds.size,
numRemove = actions.collect { case r: RemoveFile => r }.size,
bytesNew = adds.filter(_.dataChange).map(_.size).sum,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
protocol = postCommitSnapshot.protocol,
info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull,
newMetadata = newMetadata,
numAbsolutePaths,
numDistinctPartitionsInAdd = distinctPartitions.size,
isolationLevel = null)
recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)
attemptVersion
} catch {
case e: java.nio.file.FileAlreadyExistsException =>
checkAndRetry(attemptVersion, actions, attemptNumber)
}
}
- 、這裡就是真正寫事務日志的操作,其中 store 是通過
參數指定的,目前支援 HDFS、S3、Azure 以及 Local 等存儲媒體。預設是 HDFS。具體的寫事務操作參見下面的介紹。spark.delta.logStore.class
- 、持久化事務日志之後,更新記憶體中的事務日志最新的快照,然後做相關的合法性校驗;
- 、發送一些統計資訊。這裡應該是 databricks 裡面含有的功能,開源版本這裡面其實并沒有做什麼操作。
下面我們開看看真正寫事務日志的實作,為了簡單起見,我們直接檢視 HDFSLogStore 類中對應的方法,主要涉及
writeInternal
,其實作如下:
private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {
// 擷取 HDFS 的 FileContext 用于後面寫事務日志
val fc = getFileContext(path)
// 如果需要寫的事務日志已經存在那麼就需要抛出異常,後面再重試
if (!overwrite && fc.util.exists(path)) {
// This is needed for the tests to throw error with local file system
throw new FileAlreadyExistsException(path.toString)
}
// 事務日志先寫到臨時檔案
val tempPath = createTempPath(path)
var streamClosed = false // This flag is to avoid double close
var renameDone = false // This flag is to save the delete operation in most of cases.
val stream = fc.create(
tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
try {
// 将本次修改産生的 actions 寫入到臨時事務日志裡
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
stream.close()
streamClosed = true
try {
val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE
// 将臨時的事務日志移到正式的事務日志裡面,如果移動失敗則抛出異常,後面再重試
fc.rename(tempPath, path, renameOpt)
renameDone = true
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
throw new FileAlreadyExistsException(path.toString)
}
} finally {
if (!streamClosed) {
stream.close()
}
// 删除臨時事務日志
if (!renameDone) {
fc.delete(tempPath, false)
}
}
}
writeInternal 的實作邏輯很簡單,其實就是我們正常的寫檔案操作,具體如下:
- 、擷取 HDFS 的 FileContext 用于後面寫事務日志
- 、如果需要寫的事務日志已經存在那麼就需要抛出異常,後面再重試;比如上面我們寫事務日志之前磁盤中最新的事務日志檔案是 00000000000000000003.json,我們這次寫的事務日志檔案應該是 00000000000000000004.json,但是由于 Delta Lake 允許多個使用者寫資料,是以在我們擷取最新的事務日志版本到寫事務日志期間已經有使用者寫了一個新的事務日志 00000000000000000004.json,那麼我們這次寫肯定要失敗了。這時候會抛出 FileAlreadyExistsException 異常,以便後面重試。
- 、寫事務日志的時候是先寫到表 _delta_lake 目錄下的臨時檔案裡面,比如我們這次寫的事務日志檔案為 00000000000000000004.json,那麼會往類似于 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 檔案裡面寫資料的。
- 、将本次更新操作的事務記錄寫到臨時檔案裡;
- 、寫完事務日志之後我們需要将臨時事務日志移到最後正式的日志檔案裡面,比如将 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 移到 00000000000000000004.json。大家注意,在寫事務日志檔案的過程中同樣存在多個使用者修改表,是以 00000000000000000004.json 檔案很可能已經被别的修改占用了,這時候也需要抛出 FileAlreadyExistsException 異常,以便後面重試。
整個事務日志寫操作就完成了,我們再回到 doCommit 函數,注意由于 writeInternal 可能會抛出 FileAlreadyExistsException 異常,也就是 deltaLog.store.write(xxx) 調用可能會抛出異常,我們注意看到 doCommit 函數 catch 了這個異常,并在異常捕獲裡面調用
checkAndRetry(attemptVersion, actions, attemptNumber)
,這就是事務日志重寫過程, checkAndRetry 函數的實作如下:
protected def checkAndRetry(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long = recordDeltaOperation(
deltaLog,
"delta.commit.retry",
tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {
// 讀取磁盤中持久化的事務日志,并更新記憶體中事務日志快照
deltaLog.update()
// 重試的版本是剛剛更新記憶體中事務日志快照的版本+1
val nextAttempt = deltaLog.snapshot.version + 1
// 做相關的合法性判斷
(checkVersion until nextAttempt).foreach { version =>
val winningCommitActions =
deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson)
val metadataUpdates = winningCommitActions.collect { case a: Metadata => a }
val txns = winningCommitActions.collect { case a: SetTransaction => a }
val protocol = winningCommitActions.collect { case a: Protocol => a }
val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map(
ci => ci.copy(version = Some(version)))
val fileActions = winningCommitActions.collect { case f: FileAction => f }
// If the log protocol version was upgraded, make sure we are still okay.
// Fail the transaction if we're trying to upgrade protocol ourselves.
if (protocol.nonEmpty) {
protocol.foreach { p =>
deltaLog.protocolRead(p)
deltaLog.protocolWrite(p)
}
actions.foreach {
case Protocol(_, _) => throw new ProtocolChangedException(commitInfo)
case _ =>
}
}
// Fail if the metadata is different than what the txn read.
if (metadataUpdates.nonEmpty) {
throw new MetadataChangedException(commitInfo)
}
// Fail if the data is different than what the txn read.
if (dependsOnFiles && fileActions.nonEmpty) {
throw new ConcurrentWriteException(commitInfo)
}
// Fail if idempotent transactions have conflicted.
val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSet
if (txnOverlap.nonEmpty) {
throw new ConcurrentTransactionException(commitInfo)
}
}
logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")
// 開始重試事務日志的寫操作
doCommit(nextAttempt, actions, attemptNumber + 1)
}
checkAndRetry
函數隻有在事務日志寫沖突的時候才會出現,主要目的是重寫目前的事務日志。
- 、因為上次更新事務日志發生沖突,是以我們需要再一次讀取磁盤中持久化的事務日志,并更新記憶體中事務日志快照;
- 、重試的版本是剛剛更新記憶體中事務日志快照的版本+1;
- 、做相關的合法性判斷;
- 、開始重試事務日志的寫操作。
當事務日志成功持久化到磁盤之後,這時候會執行 commit 操作的最後一步,執行
postCommit
函數,其實作如下:
protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = {
committed = true
if (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) {
try {
deltaLog.checkpoint()
} catch {
case e: IllegalStateException =>
logWarning("Failed to checkpoint table state.", e)
}
}
}
postCommit 函數實作很簡單,就是判斷需不需要對事務日志做一次 checkpoint 操作,其中
deltaLog.checkpointInterval
就是通過
spark.databricks.delta.properties.defaults.checkpointInterval
參數設定的,預設每寫10次事務日志做一次 checkpoint。
checkpoint 的其實就是将記憶體中事務日志的最新快照持久化到磁盤裡面,如下所示:
-rw-r--r-- 1 yangping.wyp wheel 811B 8 28 19:12 00000000000000000000.json
-rw-r--r-- 1 yangping.wyp wheel 514B 8 28 19:14 00000000000000000001.json
-rw-r--r-- 1 yangping.wyp wheel 711B 8 29 10:54 00000000000000000002.json
-rw-r--r-- 1 yangping.wyp wheel 865B 8 29 10:56 00000000000000000003.json
-rw-r--r-- 1 yangping.wyp wheel 668B 8 29 14:36 00000000000000000004.json
-rw-r--r-- 1 yangping.wyp wheel 13K 8 29 14:36 00000000000000000005.checkpoint.parquet
-rw-r--r-- 1 yangping.wyp wheel 514B 8 29 14:36 00000000000000000005.json
-rw-r--r-- 1 yangping.wyp wheel 514B 8 29 14:36 00000000000000000006.json
-rw-r--r-- 1 yangping.wyp wheel 24B 8 29 14:36 _last_checkpoint
00000000000000000005.checkpoint.parquet 檔案就是對事務日志進行 checkpoint 的檔案,裡面彙總了 00000000000000000000.json - 00000000000000000005.json 之間的所有事務操作記錄。是以下一次需要建構事務日志的快照時,隻需要從 00000000000000000005.checkpoint.parquet 檔案、00000000000000000006.json 檔案構造,而無需再讀取 00000000000000000000.json - 00000000000000000005.json 之間的事務操作。
同時我們還看到做完 checkpoint 之後還會生成一個 _last_checkpoint 檔案,這個其實就是對
CheckpointMetaData
類的持久化操作。裡面記錄了最後一次 checkpoint 的版本,checkpoint 檔案裡面的 Action 條數,如下:
⇒ cat _last_checkpoint
{"version":5,"size":10}
注意,其實
CheckpointMetaData
類裡面還有個 parts 字段,這個代表 checkpoint 檔案有幾個分片。因為随着時間的推移,checkpoint 檔案也會變得很大,如果隻寫到一個 checkpoint 檔案裡面效率不夠好,這時候會對 checkpoint 檔案進行拆分,拆分成幾個檔案是記錄到 parts 裡面,但是目前開源版本的 Delta Lake 尚無這個功能,也不知道數磚後面會不會開源。
寫在最後
為了營造一個開放的Cassandra技術交流環境,社群建立了微信公衆号和釘釘群。為廣大使用者提供專業的技術分享及問答,定期開展專家技術直播,歡迎大家加入。另雲Cassandra免費火爆公測中,歡迎試用:
https://www.aliyun.com/product/cds