Skip to content

Commit 686f59c

Browse files
riyaverm-dbHeartSaVioR
authored andcommitted
[SPARK-48586][SS][3.5] Remove lock acquisition in doMaintenance() by making a deep copy of file mappings in RocksDBFileManager in load()
Backports #46942 to 3.5 ### What changes were proposed in this pull request? When change log checkpointing is enabled, the lock of the **RocksDB** state store is acquired when uploading the snapshot inside maintenance tasks, which causes lock contention between query processing tasks and state maintenance thread. This PR fixes lock contention issue introduced by #45724. The changes include: 1. Removing lock acquisition in `doMaintenance()` 2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and using this method to deep copy the file manager state, specifically the file mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()` 3. Capture the reference to the file mappings in `commit()`. ### Why are the changes needed? We want to eliminate lock contention to decrease latency of streaming queries so lock acquisition inside maintenance tasks should be avoided. This can introduce race conditions between task and maintenance threads. By making a deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in **RocksDBFileManager**, we can ensure that the file manager state is not updated by task thread when background snapshot uploading tasks attempt to upload a snapshot. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47130 from riyaverm-db/remove-lock-contention-between-maintenance-and-task-3.5. Lead-authored-by: Riya Verma <riya.verma@databricks.com> Co-authored-by: Riya Verma <170376104+riyaverm-db@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent ade9dbf commit 686f59c

File tree

3 files changed

+183
-53
lines changed

3 files changed

+183
-53
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
2323
import javax.annotation.concurrent.GuardedBy
2424

2525
import scala.collection.{mutable, Map}
26+
import scala.collection.mutable.ListBuffer
2627
import scala.ref.WeakReference
2728
import scala.util.Try
2829

@@ -57,14 +58,19 @@ class RocksDB(
5758
hadoopConf: Configuration = new Configuration,
5859
loggingId: String = "") extends Logging {
5960

60-
case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
61+
case class RocksDBSnapshot(
62+
checkpointDir: File,
63+
version: Long,
64+
numKeys: Long,
65+
capturedFileMappings: RocksDBFileMappings) {
6166
def close(): Unit = {
6267
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
6368
}
6469
}
6570

6671
@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
6772
@volatile private var lastSnapshotVersion = 0L
73+
private val oldSnapshots = new ListBuffer[RocksDBSnapshot]
6874

6975
RocksDBLoader.loadLibrary()
7076

@@ -148,6 +154,9 @@ class RocksDB(
148154
try {
149155
if (loadedVersion != version) {
150156
closeDB()
157+
// deep copy is needed to avoid race condition
158+
// between maintenance and task threads
159+
fileManager.copyFileMapping()
151160
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
152161
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
153162
loadedVersion = latestSnapshotVersion
@@ -156,7 +165,6 @@ class RocksDB(
156165
if (lastSnapshotVersion > latestSnapshotVersion) {
157166
// discard any newer snapshots
158167
lastSnapshotVersion = 0L
159-
latestSnapshot = None
160168
}
161169
openDB()
162170

@@ -368,10 +376,17 @@ class RocksDB(
368376
// inside the uploadSnapshot() called below.
369377
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
370378
// during state store maintenance.
371-
latestSnapshot.foreach(_.close())
372-
latestSnapshot = Some(
373-
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
374-
lastSnapshotVersion = newVersion
379+
synchronized {
380+
if (latestSnapshot.isDefined) {
381+
oldSnapshots += latestSnapshot.get
382+
}
383+
latestSnapshot = Some(
384+
RocksDBSnapshot(checkpointDir,
385+
newVersion,
386+
numKeysOnWritingVersion,
387+
fileManager.captureFileMapReference()))
388+
lastSnapshotVersion = newVersion
389+
}
375390
}
376391
}
377392

@@ -421,22 +436,34 @@ class RocksDB(
421436
}
422437

423438
private def uploadSnapshot(): Unit = {
439+
var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
424440
val localCheckpoint = synchronized {
425441
val checkpoint = latestSnapshot
426442
latestSnapshot = None
443+
444+
// Convert mutable list buffer to immutable to prevent
445+
// race condition with commit where old snapshot is added
446+
oldSnapshotsImmutable = oldSnapshots.toList
447+
oldSnapshots.clear()
448+
427449
checkpoint
428450
}
429451
localCheckpoint match {
430-
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
452+
case Some(RocksDBSnapshot(localDir, version, numKeys, capturedFileMappings)) =>
431453
try {
432454
val uploadTime = timeTakenMs {
433-
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
455+
fileManager.saveCheckpointToDfs(localDir, version, numKeys, capturedFileMappings)
434456
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
435457
}
436458
logInfo(s"$loggingId: Upload snapshot of version $version," +
437459
s" time taken: $uploadTime ms")
438460
} finally {
439461
localCheckpoint.foreach(_.close())
462+
463+
// Clean up old latestSnapshots
464+
for (snapshot <- oldSnapshotsImmutable) {
465+
snapshot.close()
466+
}
440467
}
441468
case _ =>
442469
}
@@ -457,20 +484,7 @@ class RocksDB(
457484

458485
def doMaintenance(): Unit = {
459486
if (enableChangelogCheckpointing) {
460-
// There is race to update latestSnapshot between load(), commit()
461-
// and uploadSnapshot().
462-
// The load method will reset latestSnapshot to discard any snapshots taken
463-
// from newer versions (when a old version is reloaded).
464-
// commit() method deletes the existing snapshot while creating a new snapshot.
465-
// In order to ensure that the snapshot being uploaded would not be modified
466-
// concurrently, we need to synchronize the snapshot access between task thread
467-
// and maintenance thread.
468-
acquire()
469-
try {
470-
uploadSnapshot()
471-
} finally {
472-
release()
473-
}
487+
uploadSnapshot()
474488
}
475489
val cleanupTime = timeTakenMs {
476490
fileManager.deleteOldVersions(conf.minVersionsToRetain)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,6 @@ class RocksDBFileManager(
131131

132132
import RocksDBImmutableFile._
133133

134-
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
135-
136-
137-
// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
138-
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
139-
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
140-
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
141-
// across SST files and RocksDB manifest.
142-
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]
143-
144134
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
145135
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
146136
private val onlyZipFiles = new PathFilter {
@@ -154,6 +144,30 @@ class RocksDBFileManager(
154144

155145
private def codec = CompressionCodec.createCodec(sparkConf, codecName)
156146

147+
@volatile private var fileMappings = RocksDBFileMappings(
148+
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
149+
new ConcurrentHashMap[String, RocksDBImmutableFile]
150+
)
151+
152+
/**
153+
* Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to avoid
154+
* current task thread from overwriting the file mapping whenever background maintenance
155+
* thread attempts to upload a snapshot
156+
*/
157+
def copyFileMapping() : Unit = {
158+
val newVersionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
159+
val newLocalFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]
160+
161+
newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
162+
newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)
163+
164+
fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles, newLocalFilesToDfsFiles)
165+
}
166+
167+
def captureFileMapReference(): RocksDBFileMappings = {
168+
fileMappings
169+
}
170+
157171
def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
158172
val rootDir = new Path(dfsRootDir)
159173
val changelogFile = dfsChangelogFile(version)
@@ -185,10 +199,14 @@ class RocksDBFileManager(
185199
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
186200

187201
/** Save all the files in given local checkpoint directory as a committed version in DFS */
188-
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
202+
def saveCheckpointToDfs(
203+
checkpointDir: File,
204+
version: Long,
205+
numKeys: Long,
206+
capturedFileMappings: RocksDBFileMappings): Unit = {
189207
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
190208
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
191-
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
209+
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles, capturedFileMappings)
192210
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
193211
val metadataFile = localMetadataFile(checkpointDir)
194212
metadata.writeToFile(metadataFile)
@@ -219,10 +237,10 @@ class RocksDBFileManager(
219237
// The unique ids of SST files are checked when opening a rocksdb instance. The SST files
220238
// in larger versions can't be reused even if they have the same size and name because
221239
// they belong to another rocksdb instance.
222-
versionToRocksDBFiles.keySet().removeIf(_ >= version)
240+
fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
223241
val metadata = if (version == 0) {
224242
if (localDir.exists) Utils.deleteRecursively(localDir)
225-
localFilesToDfsFiles.clear()
243+
fileMappings.localFilesToDfsFiles.clear()
226244
localDir.mkdirs()
227245
RocksDBCheckpointMetadata(Seq.empty, 0)
228246
} else {
@@ -235,7 +253,7 @@ class RocksDBFileManager(
235253
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
236254
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
237255
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
238-
versionToRocksDBFiles.put(version, metadata.immutableFiles)
256+
fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
239257
metadataFile.delete()
240258
metadata
241259
}
@@ -389,9 +407,9 @@ class RocksDBFileManager(
389407
// Resolve RocksDB files for all the versions and find the max version each file is used
390408
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
391409
sortedSnapshotVersions.foreach { version =>
392-
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
410+
val files = Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
393411
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
394-
versionToRocksDBFiles.put(version, newResolvedFiles)
412+
fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
395413
newResolvedFiles
396414
}
397415
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
@@ -436,7 +454,7 @@ class RocksDBFileManager(
436454
val versionFile = dfsBatchZipFile(version)
437455
try {
438456
fm.delete(versionFile)
439-
versionToRocksDBFiles.remove(version)
457+
fileMappings.versionToRocksDBFiles.remove(version)
440458
logDebug(s"Deleted version $version")
441459
} catch {
442460
case e: Exception =>
@@ -455,7 +473,8 @@ class RocksDBFileManager(
455473
/** Save immutable files to DFS directory */
456474
private def saveImmutableFilesToDfs(
457475
version: Long,
458-
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
476+
localFiles: Seq[File],
477+
capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
459478
// Get the immutable files used in previous versions, as some of those uploaded files can be
460479
// reused for this version
461480
logInfo(s"Saving RocksDB files to DFS for $version")
@@ -465,7 +484,8 @@ class RocksDBFileManager(
465484
var filesReused = 0L
466485

467486
val immutableFiles = localFiles.map { localFile =>
468-
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
487+
val existingDfsFile =
488+
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
469489
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
470490
val dfsFile = existingDfsFile.get
471491
filesReused += 1
@@ -487,14 +507,14 @@ class RocksDBFileManager(
487507
bytesCopied += localFileSize
488508

489509
val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
490-
localFilesToDfsFiles.put(localFileName, immutableDfsFile)
510+
capturedFileMappings.localFilesToDfsFiles.put(localFileName, immutableDfsFile)
491511

492512
immutableDfsFile
493513
}
494514
}
495515
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
496516
s" DFS for version $version. $filesReused files reused without copying.")
497-
versionToRocksDBFiles.put(version, immutableFiles)
517+
capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)
498518

499519
// Cleanup locally deleted files from the localFilesToDfsFiles map
500520
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
@@ -534,7 +554,7 @@ class RocksDBFileManager(
534554
.foreach { existingFile =>
535555
val existingFileSize = existingFile.length()
536556
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
537-
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
557+
val prevDfsFile = fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
538558
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
539559
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
540560
existingFile.length() == requiredFile.get.sizeBytes
@@ -544,7 +564,7 @@ class RocksDBFileManager(
544564

545565
if (!isSameFile) {
546566
existingFile.delete()
547-
localFilesToDfsFiles.remove(existingFile.getName)
567+
fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
548568
logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" +
549569
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
550570
} else {
@@ -574,7 +594,7 @@ class RocksDBFileManager(
574594
}
575595
filesCopied += 1
576596
bytesCopied += localFileSize
577-
localFilesToDfsFiles.put(localFileName, file)
597+
fileMappings.localFilesToDfsFiles.put(localFileName, file)
578598
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
579599
} else {
580600
filesReused += 1
@@ -592,13 +612,13 @@ class RocksDBFileManager(
592612
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
593613
// clean up deleted SST files from the localFilesToDfsFiles Map
594614
val currentLocalFiles = localFiles.map(_.getName).toSet
595-
val mappingsToClean = localFilesToDfsFiles.asScala
615+
val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
596616
.keys
597617
.filterNot(currentLocalFiles.contains)
598618

599619
mappingsToClean.foreach { f =>
600620
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
601-
localFilesToDfsFiles.remove(f)
621+
fileMappings.localFilesToDfsFiles.remove(f)
602622
}
603623
}
604624

@@ -705,6 +725,20 @@ class RocksDBFileManager(
705725
}
706726
}
707727

728+
/**
729+
* Track file mappings in RocksDB across local and remote directories
730+
* @param versionToRocksDBFiles Mapping of RocksDB files used across versions for maintenance
731+
* @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a local SST file
732+
* The reason localFilesToDfsFiles is a separate map because versionToRocksDBFiles can contain
733+
* multiple similar SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst
734+
* in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID
735+
* compatibility across SST files and RocksDB manifest.
736+
*/
737+
738+
case class RocksDBFileMappings(
739+
versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
740+
localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])
741+
708742
/**
709743
* Metrics regarding RocksDB file sync between local and DFS.
710744
*/

0 commit comments

Comments
 (0)