Skip to content

Commit 47fd2ed

Browse files
committed
Fix race condition between maintenance thread and load/commit for snapshot files
1 parent ff38378 commit 47fd2ed

File tree

3 files changed

+82
-24
lines changed

3 files changed

+82
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.File
2121
import java.util.Locale
22+
import java.util.concurrent.TimeUnit
2223
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.collection.{mutable, Map}
@@ -49,6 +50,7 @@ case object RollbackStore extends RocksDBOpType("rollback_store")
4950
case object CloseStore extends RocksDBOpType("close_store")
5051
case object ReportStoreMetrics extends RocksDBOpType("report_store_metrics")
5152
case object StoreTaskCompletionListener extends RocksDBOpType("store_task_completion_listener")
53+
case object StoreMaintenance extends RocksDBOpType("store_maintenance")
5254

5355
/**
5456
* Class representing a RocksDB instance that checkpoints version of data to DFS.
@@ -184,19 +186,23 @@ class RocksDB(
184186
loadedVersion = latestSnapshotVersion
185187

186188
// reset last snapshot version
187-
lastSnapshotVersion = 0L
189+
if (lastSnapshotVersion > latestSnapshotVersion) {
190+
// discard any newer snapshots
191+
lastSnapshotVersion = 0L
192+
latestSnapshot = None
193+
}
188194
openDB()
189195

190196
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
191-
// we don't track the total number of rows - discard the number being track
192-
-1L
193-
} else if (metadata.numKeys < 0) {
194-
// we track the total number of rows, but the snapshot doesn't have tracking number
195-
// need to count keys now
196-
countKeys()
197-
} else {
198-
metadata.numKeys
199-
}
197+
// we don't track the total number of rows - discard the number being track
198+
-1L
199+
} else if (metadata.numKeys < 0) {
200+
// we track the total number of rows, but the snapshot doesn't have tracking number
201+
// need to count keys now
202+
countKeys()
203+
} else {
204+
metadata.numKeys
205+
}
200206
if (loadedVersion != version) replayChangelog(version)
201207
// After changelog replay the numKeysOnWritingVersion will be updated to
202208
// the correct number of keys in the loaded version.
@@ -571,16 +577,14 @@ class RocksDB(
571577
// background operations.
572578
val cp = Checkpoint.create(db)
573579
cp.createCheckpoint(checkpointDir.toString)
574-
synchronized {
575-
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
576-
// inside the uploadSnapshot() called below.
577-
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
578-
// during state store maintenance.
579-
latestSnapshot.foreach(_.close())
580-
latestSnapshot = Some(
581-
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
582-
lastSnapshotVersion = newVersion
583-
}
580+
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
581+
// inside the uploadSnapshot() called below.
582+
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
583+
// during state store maintenance.
584+
latestSnapshot.foreach(_.close())
585+
latestSnapshot = Some(
586+
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
587+
lastSnapshotVersion = newVersion
584588
}
585589
}
586590

@@ -668,7 +672,20 @@ class RocksDB(
668672

669673
def doMaintenance(): Unit = {
670674
if (enableChangelogCheckpointing) {
671-
uploadSnapshot()
675+
// There is race to update latestSnapshot between load(), commit()
676+
// and uploadSnapshot().
677+
// The load method will reset latestSnapshot to discard any snapshots taken
678+
// from newer versions (when a old version is reloaded).
679+
// commit() method deletes the existing snapshot while creating a new snapshot.
680+
// In order to ensure that the snapshot being uploaded would not be modified
681+
// concurrently, we need to synchronize the snapshot access between task thread
682+
// and maintenance thread.
683+
acquire(StoreMaintenance)
684+
try {
685+
uploadSnapshot()
686+
} finally {
687+
release(StoreMaintenance)
688+
}
672689
}
673690
val cleanupTime = timeTakenMs {
674691
fileManager.deleteOldVersions(conf.minVersionsToRetain)
@@ -788,8 +805,11 @@ class RocksDB(
788805
*/
789806
private def acquire(opType: RocksDBOpType): Unit = acquireLock.synchronized {
790807
val newAcquiredThreadInfo = AcquiredThreadInfo()
791-
val waitStartTime = System.currentTimeMillis
792-
def timeWaitedMs = System.currentTimeMillis - waitStartTime
808+
val waitStartTime = System.nanoTime()
809+
def timeWaitedMs = {
810+
val elapsedNanos = System.nanoTime() - waitStartTime
811+
TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
812+
}
793813
def isAcquiredByDifferentThread = acquiredThreadInfo != null &&
794814
acquiredThreadInfo.threadRef.get.isDefined &&
795815
newAcquiredThreadInfo.threadRef.get.get.getId != acquiredThreadInfo.threadRef.get.get.getId

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,7 @@ class RocksDBFileManager(
554554
// Delete unnecessary local immutable files
555555
localImmutableFiles
556556
.foreach { existingFile =>
557+
val existingFileSize = existingFile.length()
557558
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
558559
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
559560
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
@@ -566,7 +567,7 @@ class RocksDBFileManager(
566567
if (!isSameFile) {
567568
existingFile.delete()
568569
localFilesToDfsFiles.remove(existingFile.getName)
569-
logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" +
570+
logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" +
570571
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
571572
} else {
572573
logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile")

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
383383
db.load(version, readOnly = true)
384384
assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
385385
}
386+
387+
// recommit 60 to ensure that acquireLock is released for maintenance
388+
for (version <- 60 to 60) {
389+
db.load(version - 1)
390+
db.put(version.toString, version.toString)
391+
db.remove((version - 1).toString)
392+
db.commit()
393+
}
386394
// Check that snapshots and changelogs get purged correctly.
387395
db.doMaintenance()
388396
assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
@@ -1919,6 +1927,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
19191927
}
19201928
}
19211929

1930+
testWithChangelogCheckpointingEnabled("time travel 4 -" +
1931+
" validate successful RocksDB load") {
1932+
val remoteDir = Utils.createTempDir().toString
1933+
val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
1934+
new File(remoteDir).delete() // to make sure that the directory gets created
1935+
withDB(remoteDir, conf = conf) { db =>
1936+
for (version <- 0 to 1) {
1937+
db.load(version)
1938+
db.put(version.toString, version.toString)
1939+
db.commit()
1940+
}
1941+
1942+
// load previous version, and recreate the snapshot
1943+
db.load(1)
1944+
db.put("3", "3")
1945+
1946+
// do maintenance - upload any latest snapshots so far
1947+
// would fail to acquire lock and no snapshots would be uploaded
1948+
db.doMaintenance()
1949+
db.commit()
1950+
// upload newly created snapshot 2.zip
1951+
db.doMaintenance()
1952+
}
1953+
1954+
// reload version 2 - should succeed
1955+
withDB(remoteDir, version = 2, conf = conf) { db =>
1956+
}
1957+
}
1958+
19221959
test("validate Rocks DB SST files do not have a VersionIdMismatch" +
19231960
" when metadata file is not overwritten - scenario 1") {
19241961
val fmClass = "org.apache.spark.sql.execution.streaming.state." +

0 commit comments

Comments
 (0)