Skip to content

Commit 40ad829

Browse files
riyaverm-dbHeartSaVioR
authored andcommitted
[SPARK-48586][SS] Remove lock acquisition in doMaintenance() by making a deep copy of file mappings in RocksDBFileManager in load()
### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46942 from riyaverm-db/remove-lock-contention-between-maintenance-and-task. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent b5a55e4 commit 40ad829

File tree

3 files changed

+183
-54
lines changed

3 files changed

+183
-54
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
2323
import javax.annotation.concurrent.GuardedBy
2424

2525
import scala.collection.{mutable, Map}
26-
import scala.collection.mutable.ArrayBuffer
26+
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2727
import scala.jdk.CollectionConverters._
2828
import scala.ref.WeakReference
2929
import scala.util.Try
@@ -74,14 +74,19 @@ class RocksDB(
7474
loggingId: String = "",
7575
useColumnFamilies: Boolean = false) extends Logging {
7676

77-
case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
77+
case class RocksDBSnapshot(
78+
checkpointDir: File,
79+
version: Long,
80+
numKeys: Long,
81+
capturedFileMappings: RocksDBFileMappings) {
7882
def close(): Unit = {
7983
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
8084
}
8185
}
8286

8387
@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
8488
@volatile private var lastSnapshotVersion = 0L
89+
private val oldSnapshots = new ListBuffer[RocksDBSnapshot]
8590

8691
RocksDBLoader.loadLibrary()
8792

@@ -181,6 +186,9 @@ class RocksDB(
181186
try {
182187
if (loadedVersion != version) {
183188
closeDB()
189+
// deep copy is needed to avoid race condition
190+
// between maintenance and task threads
191+
fileManager.copyFileMapping()
184192
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
185193
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
186194
loadedVersion = latestSnapshotVersion
@@ -189,7 +197,6 @@ class RocksDB(
189197
if (lastSnapshotVersion > latestSnapshotVersion) {
190198
// discard any newer snapshots
191199
lastSnapshotVersion = 0L
192-
latestSnapshot = None
193200
}
194201
openDB()
195202

@@ -588,10 +595,17 @@ class RocksDB(
588595
// inside the uploadSnapshot() called below.
589596
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
590597
// during state store maintenance.
591-
latestSnapshot.foreach(_.close())
592-
latestSnapshot = Some(
593-
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
594-
lastSnapshotVersion = newVersion
598+
synchronized {
599+
if (latestSnapshot.isDefined) {
600+
oldSnapshots += latestSnapshot.get
601+
}
602+
latestSnapshot = Some(
603+
RocksDBSnapshot(checkpointDir,
604+
newVersion,
605+
numKeysOnWritingVersion,
606+
fileManager.captureFileMapReference()))
607+
lastSnapshotVersion = newVersion
608+
}
595609
}
596610
}
597611

@@ -643,23 +657,36 @@ class RocksDB(
643657
}
644658

645659
private def uploadSnapshot(): Unit = {
660+
var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
646661
val localCheckpoint = synchronized {
647662
val checkpoint = latestSnapshot
648663
latestSnapshot = None
664+
665+
// Convert mutable list buffer to immutable to prevent
666+
// race condition with commit where old snapshot is added
667+
oldSnapshotsImmutable = oldSnapshots.toList
668+
oldSnapshots.clear()
669+
649670
checkpoint
650671
}
651672
localCheckpoint match {
652-
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
673+
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
653674
try {
654675
val uploadTime = timeTakenMs {
655-
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
676+
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
656677
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
657678
}
658679
logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " +
659680
log"${MDC(LogKeys.VERSION_NUM, version)}," +
660681
log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
661682
} finally {
662683
localCheckpoint.foreach(_.close())
684+
685+
// Clean up old latestSnapshots
686+
for (snapshot <- oldSnapshotsImmutable) {
687+
snapshot.close()
688+
}
689+
663690
}
664691
case _ =>
665692
}
@@ -681,20 +708,7 @@ class RocksDB(
681708

682709
def doMaintenance(): Unit = {
683710
if (enableChangelogCheckpointing) {
684-
// There is race to update latestSnapshot between load(), commit()
685-
// and uploadSnapshot().
686-
// The load method will reset latestSnapshot to discard any snapshots taken
687-
// from newer versions (when a old version is reloaded).
688-
// commit() method deletes the existing snapshot while creating a new snapshot.
689-
// In order to ensure that the snapshot being uploaded would not be modified
690-
// concurrently, we need to synchronize the snapshot access between task thread
691-
// and maintenance thread.
692-
acquire(StoreMaintenance)
693-
try {
694-
uploadSnapshot()
695-
} finally {
696-
release(StoreMaintenance)
697-
}
711+
uploadSnapshot()
698712
}
699713
val cleanupTime = timeTakenMs {
700714
fileManager.deleteOldVersions(conf.minVersionsToRetain)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,6 @@ class RocksDBFileManager(
133133

134134
import RocksDBImmutableFile._
135135

136-
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
137-
138-
139-
// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
140-
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
141-
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
142-
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
143-
// across SST files and RocksDB manifest.
144-
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]
145-
146136
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
147137
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
148138
private val onlyZipFiles = new PathFilter {
@@ -157,6 +147,29 @@ class RocksDBFileManager(
157147
private def codec = CompressionCodec.createCodec(sparkConf, codecName)
158148

159149
@volatile private var rootDirChecked: Boolean = false
150+
@volatile private var fileMappings = RocksDBFileMappings(
151+
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
152+
new ConcurrentHashMap[String, RocksDBImmutableFile]
153+
)
154+
155+
/**
156+
* Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid
157+
* current task thread from overwriting the file mapping whenever background maintenance
158+
* thread attempts to upload a snapshot
159+
*/
160+
def copyFileMapping() : Unit = {
161+
val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
162+
val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]
163+
164+
newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
165+
newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)
166+
167+
fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles)
168+
}
169+
170+
def captureFileMapReference(): RocksDBFileMappings = {
171+
fileMappings
172+
}
160173

161174
def getChangeLogWriter(
162175
version: Long,
@@ -204,11 +217,15 @@ class RocksDBFileManager(
204217
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
205218

206219
/** Save all the files in given local checkpoint directory as a committed version in DFS */
207-
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
220+
def saveCheckpointToDfs(
221+
checkpointDir: File,
222+
version: Long,
223+
numKeys: Long,
224+
capturedFileMappings: RocksDBFileMappings): Unit = {
208225
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
209226
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
210227
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
211-
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
228+
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
212229
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
213230
val metadataFile = localMetadataFile(checkpointDir)
214231
metadata.writeToFile(metadataFile)
@@ -243,10 +260,10 @@ class RocksDBFileManager(
243260
// The unique ids of SST files are checked when opening a rocksdb instance. The SST files
244261
// in larger versions can't be reused even if they have the same size and name because
245262
// they belong to another rocksdb instance.
246-
versionToRocksDBFiles.keySet().removeIf(_ >= version)
263+
fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
247264
val metadata = if (version == 0) {
248265
if (localDir.exists) Utils.deleteRecursively(localDir)
249-
localFilesToDfsFiles.clear()
266+
fileMappings.localFilesToDfsFiles.clear()
250267
localDir.mkdirs()
251268
RocksDBCheckpointMetadata(Seq.empty, 0)
252269
} else {
@@ -260,7 +277,7 @@ class RocksDBFileManager(
260277
logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
261278
log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
262279
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
263-
versionToRocksDBFiles.put(version, metadata.immutableFiles)
280+
fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
264281
metadataFile.delete()
265282
metadata
266283
}
@@ -417,9 +434,9 @@ class RocksDBFileManager(
417434
// Resolve RocksDB files for all the versions and find the max version each file is used
418435
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
419436
sortedSnapshotVersions.foreach { version =>
420-
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
437+
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
421438
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
422-
versionToRocksDBFiles.put(version, newResolvedFiles)
439+
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
423440
newResolvedFiles
424441
}
425442
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
@@ -466,7 +483,7 @@ class RocksDBFileManager(
466483
val versionFile = dfsBatchZipFile(version)
467484
try {
468485
fm.delete(versionFile)
469-
versionToRocksDBFiles.remove(version)
486+
fileMappings.versionToRocksDBFiles.remove(version)
470487
logDebug(s"Deleted version $version")
471488
} catch {
472489
case e: Exception =>
@@ -487,7 +504,8 @@ class RocksDBFileManager(
487504
/** Save immutable files to DFS directory */
488505
private def saveImmutableFilesToDfs(
489506
version: Long,
490-
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
507+
localFiles: Seq[File],
508+
capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
491509
// Get the immutable files used in previous versions, as some of those uploaded files can be
492510
// reused for this version
493511
logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}")
@@ -497,7 +515,8 @@ class RocksDBFileManager(
497515
var filesReused = 0L
498516

499517
val immutableFiles = localFiles.map { localFile =>
500-
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
518+
val existingDfsFile =
519+
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
501520
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
502521
val dfsFile = existingDfsFile.get
503522
filesReused += 1
@@ -521,7 +540,7 @@ class RocksDBFileManager(
521540
bytesCopied += localFileSize
522541

523542
val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
524-
localFilesToDfsFiles.put(localFileName, immutableDfsFile)
543+
capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile)
525544

526545
immutableDfsFile
527546
}
@@ -530,7 +549,7 @@ class RocksDBFileManager(
530549
log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" +
531550
log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " +
532551
log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.")
533-
versionToRocksDBFiles.put(version, immutableFiles)
552+
capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)
534553

535554
// Cleanup locally deleted files from the localFilesToDfsFiles map
536555
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
@@ -570,7 +589,7 @@ class RocksDBFileManager(
570589
.foreach { existingFile =>
571590
val existingFileSize = existingFile.length()
572591
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
573-
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
592+
val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
574593
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
575594
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
576595
existingFile.length() == requiredFile.get.sizeBytes
@@ -580,7 +599,7 @@ class RocksDBFileManager(
580599

581600
if (!isSameFile) {
582601
existingFile.delete()
583-
localFilesToDfsFiles.remove(existingFile.getName)
602+
fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
584603
logInfo(log"Deleted local file ${MDC(LogKeys.FILE_NAME, existingFile)} " +
585604
log"with size ${MDC(LogKeys.NUM_BYTES, existingFileSize)} mapped" +
586605
log" to previous dfsFile ${MDC(LogKeys.DFS_FILE, prevDfsFile.getOrElse("null"))}")
@@ -612,7 +631,7 @@ class RocksDBFileManager(
612631
}
613632
filesCopied += 1
614633
bytesCopied += localFileSize
615-
localFilesToDfsFiles.put(localFileName, file)
634+
fileMappings.localFilesToDfsFiles.put(localFileName, file)
616635
logInfo(log"Copied ${MDC(LogKeys.DFS_FILE, dfsFile)} to " +
617636
log"${MDC(LogKeys.FILE_NAME, localFile)} - " +
618637
log"${MDC(LogKeys.NUM_BYTES, localFileSize)} bytes")
@@ -633,13 +652,13 @@ class RocksDBFileManager(
633652
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
634653
// clean up deleted SST files from the localFilesToDfsFiles Map
635654
val currentLocalFiles = localFiles.map(_.getName).toSet
636-
val mappingsToClean = localFilesToDfsFiles.asScala
655+
val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
637656
.keys
638657
.filterNot(currentLocalFiles.contains)
639658

640659
mappingsToClean.foreach { f =>
641660
logInfo(log"cleaning ${MDC(LogKeys.FILE_NAME, f)} from the localFilesToDfsFiles map")
642-
localFilesToDfsFiles.remove(f)
661+
fileMappings.localFilesToDfsFiles.remove(f)
643662
}
644663
}
645664

@@ -749,6 +768,20 @@ class RocksDBFileManager(
749768
}
750769
}
751770

771+
/**
772+
* Track file mappings in RocksDB across local and remote directories
773+
* @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance
774+
* @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file
775+
* The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain
776+
* multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst
777+
* in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID
778+
* compatibility across SST files and RocksDB manifest.
779+
*/
780+
781+
case class RocksDBFileMappings(
782+
versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
783+
localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])
784+
752785
/**
753786
* Metrics regarding RocksDB file sync between local and DFS.
754787
*/

0 commit comments

Comments
 (0)