Skip to content

Commit

Permalink
[SPARK] Managed Commit support for cold and hot snapshot update (#2755)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR adds support for cold and hot snapshot update for
managed-commits.

## How was this patch tested?

UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
prakharjain09 authored Mar 20, 2024
1 parent a11b92d commit 4619af7
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 95 deletions.
173 changes: 102 additions & 71 deletions spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ trait SnapshotManagement { self: DeltaLog =>

@volatile private[delta] var asyncUpdateTask: Future[Unit] = _

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Cached fileStatus for the latest CRC file seen in the deltaLog.
*/
@volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None
@volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Run `body` inside `snapshotLock` lock using `lockInterruptibly` so that the thread
* can be interrupted when waiting for the lock.
Expand All @@ -81,20 +81,6 @@ trait SnapshotManagement { self: DeltaLog =>
}
}

/**
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
* initialization, or None if the directory was empty/missing.
*
* @param startingCheckpoint A checkpoint that we can start our listing from
*/
protected def getLogSegmentFrom(
startingCheckpoint: Option[LastCheckpointInfo]): Option[LogSegment] = {
getLogSegmentForVersion(
versionToLoad = None,
lastCheckpointInfo = startingCheckpoint
)
}

/** Get an iterator of files in the _delta_log directory starting with the startVersion. */
private[delta] def listFrom(startVersion: Long): Iterator[FileStatus] = {
store.listFrom(listingPrefix(logPath, startVersion), newDeltaHadoopConf())
Expand Down Expand Up @@ -231,11 +217,11 @@ trait SnapshotManagement { self: DeltaLog =>
* @return Some LogSegment to build a Snapshot if files do exist after the given
* startCheckpoint. None, if the directory was missing or empty.
*/
protected def getLogSegmentForVersion(
protected def createLogSegment(
versionToLoad: Option[Long] = None,
oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None,
lastCheckpointInfo: Option[LastCheckpointInfo] = None,
commitStoreOpt: Option[CommitStore] = None): Option[LogSegment] = {
commitStoreOpt: Option[CommitStore] = None,
lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment] = {
// List based on the last known checkpoint version.
// if that is -1, list from version 0L
val lastCheckpointVersion = getCheckpointVersion(lastCheckpointInfo, oldCheckpointProviderOpt)
Expand All @@ -253,6 +239,12 @@ trait SnapshotManagement { self: DeltaLog =>
)
}

private def createLogSegment(previousSnapshot: Snapshot): Option[LogSegment] = {
createLogSegment(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
}

/**
* Returns the last known checkpoint version based on [[LastCheckpointInfo]] or
* [[CheckpointProvider]].
Expand Down Expand Up @@ -324,7 +316,7 @@ trait SnapshotManagement { self: DeltaLog =>
// deleting files. Either way, we can't safely continue.
//
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
// recursive call to [[getLogSegmentForVersion]] below.
// recursive call to [[createLogSegment]] below.
Array.empty[FileStatus]
}

Expand All @@ -335,7 +327,7 @@ trait SnapshotManagement { self: DeltaLog =>
} else if (newFiles.isEmpty) {
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
// singleton, so try listing from the first version
return getLogSegmentForVersion(versionToLoad = versionToLoad)
return createLogSegment(versionToLoad = versionToLoad)
}
val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile)
val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile)
Expand Down Expand Up @@ -498,30 +490,20 @@ trait SnapshotManagement { self: DeltaLog =>
* file as a hint on where to start listing the transaction log directory. If the _delta_log
* directory doesn't exist, this method will return an `InitialSnapshot`.
*/
protected def getSnapshotAtInit: CapturedSnapshot = {
protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly {
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
val currentTimestamp = clock.getTimeMillis()
val snapshotInitWallclockTime = clock.getTimeMillis()
val lastCheckpointOpt = readLastCheckpointFile()
createSnapshotAtInitInternal(
initSegment = getLogSegmentFrom(lastCheckpointOpt),
timestamp = currentTimestamp
)
}
}

protected def createSnapshotAtInitInternal(
initSegment: Option[LogSegment],
timestamp: Long): CapturedSnapshot = {
val snapshot = initSegment.map { segment =>
val snapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
snapshot
}.getOrElse {
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
val initialSegmentForNewSnapshot = createLogSegment(
versionToLoad = None,
lastCheckpointInfo = lastCheckpointOpt)
val snapshot = getUpdatedSnapshot(
oldSnapshotOpt = None,
initialSegmentForNewSnapshot = initialSegmentForNewSnapshot,
initialCommitStore = None,
isAsync = false)
CapturedSnapshot(snapshot, snapshotInitWallclockTime)
}
CapturedSnapshot(snapshot, timestamp)
}

/**
Expand Down Expand Up @@ -696,7 +678,7 @@ trait SnapshotManagement { self: DeltaLog =>
* Instead, just do a general update to the latest available version. The racing commits
* can then use the version check short-circuit to avoid constructing a new snapshot.
*/
getLogSegmentForVersion(
createLogSegment(
oldCheckpointProviderOpt = Some(oldCheckpointProvider),
commitStoreOpt = commitStoreOpt
).getOrElse {
Expand Down Expand Up @@ -907,48 +889,95 @@ trait SnapshotManagement { self: DeltaLog =>
*/
protected def updateInternal(isAsync: Boolean): Snapshot =
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
val updateTimestamp = clock.getTimeMillis()
val updateStartTimeMs = clock.getTimeMillis()
val previousSnapshot = currentSnapshot.snapshot
val segmentOpt = getLogSegmentForVersion(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
installLogSegmentInternal(previousSnapshot, segmentOpt, updateTimestamp, isAsync)
val segmentOpt = createLogSegment(previousSnapshot)
val newSnapshot = getUpdatedSnapshot(
oldSnapshotOpt = Some(previousSnapshot),
initialSegmentForNewSnapshot = segmentOpt,
initialCommitStore = previousSnapshot.commitStoreOpt,
isAsync = isAsync)
installSnapshot(newSnapshot, updateStartTimeMs)
}

/**
* Updates and installs a new snapshot in the `currentSnapshot`.
* This method takes care of recursively creating new snapshots if the commit store has changed.
* @param oldSnapshotOpt The previous snapshot, if any.
* @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot
* @param initialCommitStore the Commit Store used for constructing the
* `initialSegmentForNewSnapshot`
* @param isAsync Whether the update is async.
* @return The new snapshot.
*/
protected def getUpdatedSnapshot(
oldSnapshotOpt: Option[Snapshot],
initialSegmentForNewSnapshot: Option[LogSegment],
initialCommitStore: Option[CommitStore],
isAsync: Boolean): Snapshot = {
var commitStoreUsed = initialCommitStore
var newSnapshot = getSnapshotForLogSegmentInternal(
oldSnapshotOpt,
initialSegmentForNewSnapshot,
isAsync
)
// If the commit store has changed, we need to again invoke updateSnapshot so that we
// could get the latest commits from the new commit store. We need to do it only once as
// the delta spec mandates the commit which changes the commit owner to be backfilled.
if (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) {
commitStoreUsed = newSnapshot.commitStoreOpt
val segmentOpt = createLogSegment(newSnapshot)
newSnapshot = getSnapshotForLogSegmentInternal(Some(newSnapshot), segmentOpt, isAsync)
}
newSnapshot
}

/** Install the provided segmentOpt as the currentSnapshot on the cluster */
protected def installLogSegmentInternal(
previousSnapshot: Snapshot,
/** Creates a Snapshot for the given `segmentOpt` */
protected def getSnapshotForLogSegmentInternal(
previousSnapshotOpt: Option[Snapshot],
segmentOpt: Option[LogSegment],
updateTimestamp: Long,
isAsync: Boolean): Snapshot = {
segmentOpt.map { segment =>
if (segment == previousSnapshot.logSegment) {
// If no changes were detected, just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, currentSnapshot.updateTimestamp)
currentSnapshot = currentSnapshot.copy(updateTimestamp = timestampToUse)
if (previousSnapshotOpt.exists(_.logSegment == segment)) {
previousSnapshotOpt.get
} else {
val newSnapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot))
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
newSnapshot
}
}.getOrElse {
logInfo(s"No delta log found for the Delta table at $logPath")
replaceSnapshot(new InitialSnapshot(logPath, this), updateTimestamp)
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
}
currentSnapshot.snapshot
}

/** Replace the given snapshot with the provided one. */
protected def replaceSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Unit = {
/** Installs the given `newSnapshot` as the `currentSnapshot` */
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
if (!snapshotLock.isHeldByCurrentThread) {
if (Utils.isTesting) {
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
}
recordDeltaEvent(this, "delta.update.unsafeReplace")
}
val oldSnapshot = currentSnapshot.snapshot
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
if (currentSnapshot == null) {
// cold snapshot initialization
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
return newSnapshot
}
val CapturedSnapshot(oldSnapshot, oldTimestamp) = currentSnapshot
if (oldSnapshot eq newSnapshot) {
// Same snapshot as before, so just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, oldTimestamp)
currentSnapshot = CapturedSnapshot(newSnapshot, timestampToUse)
} else {
// Install the new snapshot and uncache the old one
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
}
newSnapshot
}

/** Log a change in the metadata's table id whenever we install a newer version of a snapshot */
Expand Down Expand Up @@ -1022,8 +1051,7 @@ trait SnapshotManagement { self: DeltaLog =>
committedVersion)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
currentSnapshot.snapshot
installSnapshot(newSnapshot, updateTimestamp)
}
}

Expand All @@ -1045,7 +1073,7 @@ trait SnapshotManagement { self: DeltaLog =>
// fallback to the other overload.
return getSnapshotAt(version)
}
val segment = getLogSegmentForVersion(
val segment = createLogSegment(
versionToLoad = Some(version),
oldCheckpointProviderOpt = Some(lastCheckpointProvider)
).getOrElse {
Expand Down Expand Up @@ -1073,7 +1101,7 @@ trait SnapshotManagement { self: DeltaLog =>
.collect { case ci if ci.version <= version => ci }
.orElse(findLastCompleteCheckpointBefore(version))
.map(manuallyLoadCheckpoint)
getLogSegmentForVersion(
createLogSegment(
versionToLoad = Some(version),
lastCheckpointInfo = lastCheckpointInfoHint
).map { segment =>
Expand All @@ -1085,6 +1113,9 @@ trait SnapshotManagement { self: DeltaLog =>
throw DeltaErrors.emptyDirectoryException(logPath.toString)
}
}

// Visible for testing
private[delta] def getCapturedSnapshot(): CapturedSnapshot = currentSnapshot
}

object SnapshotManagement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
* Commit a given `commitFile` to the table represented by given `logPath` at the
* given `commitVersion`
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}

class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingCommitStore {

private[managedcommit] class PerTableData {
private[managedcommit] class PerTableData(var maxCommitVersion: Long = -1) {
// Map from version to Commit data
val commitsMap: mutable.SortedMap[Long, Commit] = mutable.SortedMap.empty
// We maintain maxCommitVersion explicitly since commitsMap might be empty
// if all commits for a table have been backfilled.
var maxCommitVersion: Long = -1
val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
}

Expand Down Expand Up @@ -71,7 +70,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
* @throws CommitFailedException if the commit version is not the expected next version,
* indicating a version conflict.
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down Expand Up @@ -126,6 +125,15 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
versionsToRemove.foreach(tableData.commitsMap.remove)
}
}

def registerTable(
logPath: Path,
maxCommitVersion: Long): Unit = {
val newPerTableData = new PerTableData(maxCommitVersion)
if (perTableMap.putIfAbsent(logPath, newPerTableData) != null) {
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,13 +936,6 @@ class CheckpointsSuite
assert(filterUsageRecords(usageRecords2, "delta.log.cleanup").size > 0)
}
}

protected def filterUsageRecords(
usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.util.matching.Regex

import com.databricks.spark.util.UsageRecord
import org.apache.spark.sql.delta.DeltaTestUtils.Plans
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -155,6 +156,13 @@ trait DeltaTestUtilsBase {
jobs.values.count(_ > 0)
}

/** Filter `usageRecords` by the `opType` tag or field. */
def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Loading

0 comments on commit 4619af7

Please sign in to comment.