Skip to content

[SPARK-47568][SS] Fix race condition between maintenance thread and load/commit for snapshot files. #45724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state

import java.io.File
import java.util.Locale
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
Expand Down Expand Up @@ -49,6 +50,7 @@ case object RollbackStore extends RocksDBOpType("rollback_store")
case object CloseStore extends RocksDBOpType("close_store")
case object ReportStoreMetrics extends RocksDBOpType("report_store_metrics")
case object StoreTaskCompletionListener extends RocksDBOpType("store_task_completion_listener")
case object StoreMaintenance extends RocksDBOpType("store_maintenance")

/**
* Class representing a RocksDB instance that checkpoints version of data to DFS.
Expand Down Expand Up @@ -184,19 +186,23 @@ class RocksDB(
loadedVersion = latestSnapshotVersion

// reset last snapshot version
lastSnapshotVersion = 0L
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
if (loadedVersion != version) replayChangelog(version)
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
Expand Down Expand Up @@ -571,16 +577,14 @@ class RocksDB(
// background operations.
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
synchronized {
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
}
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
}
}

Expand Down Expand Up @@ -668,7 +672,20 @@ class RocksDB(

def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
uploadSnapshot()
// There is race to update latestSnapshot between load(), commit()
// and uploadSnapshot().
// The load method will reset latestSnapshot to discard any snapshots taken
// from newer versions (when a old version is reloaded).
// commit() method deletes the existing snapshot while creating a new snapshot.
// In order to ensure that the snapshot being uploaded would not be modified
// concurrently, we need to synchronize the snapshot access between task thread
// and maintenance thread.
acquire(StoreMaintenance)
try {
uploadSnapshot()
} finally {
release(StoreMaintenance)
}
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
Expand Down Expand Up @@ -788,8 +805,11 @@ class RocksDB(
*/
private def acquire(opType: RocksDBOpType): Unit = acquireLock.synchronized {
val newAcquiredThreadInfo = AcquiredThreadInfo()
val waitStartTime = System.currentTimeMillis
def timeWaitedMs = System.currentTimeMillis - waitStartTime
val waitStartTime = System.nanoTime()
def timeWaitedMs = {
val elapsedNanos = System.nanoTime() - waitStartTime
TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
}
def isAcquiredByDifferentThread = acquiredThreadInfo != null &&
acquiredThreadInfo.threadRef.get.isDefined &&
newAcquiredThreadInfo.threadRef.get.get.getId != acquiredThreadInfo.threadRef.get.get.getId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ class RocksDBFileManager(
// Delete unnecessary local immutable files
localImmutableFiles
.foreach { existingFile =>
val existingFileSize = existingFile.length()
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
Expand All @@ -566,7 +567,7 @@ class RocksDBFileManager(
if (!isSameFile) {
existingFile.delete()
localFilesToDfsFiles.remove(existingFile.getName)
logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" +
logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" +
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
} else {
logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(version, readOnly = true)
assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
}

// recommit 60 to ensure that acquireLock is released for maintenance
for (version <- 60 to 60) {
db.load(version - 1)
db.put(version.toString, version.toString)
db.remove((version - 1).toString)
db.commit()
}
// Check that snapshots and changelogs get purged correctly.
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
Expand Down Expand Up @@ -1919,6 +1927,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

testWithChangelogCheckpointingEnabled("time travel 4 -" +
" validate successful RocksDB load") {
val remoteDir = Utils.createTempDir().toString
val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
new File(remoteDir).delete() // to make sure that the directory gets created
withDB(remoteDir, conf = conf) { db =>
for (version <- 0 to 1) {
db.load(version)
db.put(version.toString, version.toString)
db.commit()
}

// load previous version, and recreate the snapshot
db.load(1)
db.put("3", "3")

// do maintenance - upload any latest snapshots so far
// would fail to acquire lock and no snapshots would be uploaded
db.doMaintenance()
db.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we think of the way to verify this? Or is it not feasible as it's about race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify that maintenance actually fails here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no snapshot being uploaded at this moment. but OK to skip if it's bound to race condition.

// upload newly created snapshot 2.zip
db.doMaintenance()
}

// reload version 2 - should succeed
withDB(remoteDir, version = 2, conf = conf) { db =>
}
}

test("validate Rocks DB SST files do not have a VersionIdMismatch" +
" when metadata file is not overwritten - scenario 1") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Expand Down