Skip to content

Commit aeba488

Browse files
warrenzhu25Mridul Muralidharan
authored andcommitted
[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu <warren.zhu25@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 0d6fda5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent 3a723a1 commit aeba488

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -415,13 +415,14 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
415415
* then just go ahead and acquire the write lock. Otherwise, if another thread is already
416416
* writing the block, then we wait for the write to finish before acquiring the read lock.
417417
*
418-
* @return true if the block did not already exist, false otherwise. If this returns false, then
419-
* a read lock on the existing block will be held. If this returns true, a write lock on
420-
* the new block will be held.
418+
* @return true if the block did not already exist, false otherwise.
419+
* If this returns true, a write lock on the new block will be held.
420+
* If this returns false then a read lock will be held iff keepReadLock == true.
421421
*/
422422
def lockNewBlockForWriting(
423423
blockId: BlockId,
424-
newBlockInfo: BlockInfo): Boolean = {
424+
newBlockInfo: BlockInfo,
425+
keepReadLock: Boolean = true): Boolean = {
425426
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
426427
// Get the lock that will be associated with the to-be written block and lock it for the entire
427428
// duration of this operation. This way we prevent race conditions when two threads try to write
@@ -449,6 +450,8 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
449450
val result = lockForWriting(blockId, blocking = false)
450451
assert(result.isDefined)
451452
return true
453+
} else if (!keepReadLock) {
454+
return false
452455
} else {
453456
// Block already exists. This could happen if another thread races with us to compute
454457
// the same block. In this case we try to acquire a read lock, if the locking succeeds

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,14 +1510,10 @@ private[spark] class BlockManager(
15101510

15111511
val putBlockInfo = {
15121512
val newInfo = new BlockInfo(level, classTag, tellMaster)
1513-
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
1513+
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) {
15141514
newInfo
15151515
} else {
15161516
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
1517-
if (!keepReadLock) {
1518-
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
1519-
releaseLock(blockId)
1520-
}
15211517
return None
15221518
}
15231519
}

core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,20 @@ class BlockInfoManagerSuite extends SparkFunSuite {
166166
assert(blockInfoManager.get("block").get.readerCount === 1)
167167
}
168168

169+
test("lockNewBlockForWriting should not block when keepReadLock is false") {
170+
withTaskId(0) {
171+
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
172+
}
173+
val lock1Future = Future {
174+
withTaskId(1) {
175+
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false)
176+
}
177+
}
178+
179+
assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
180+
assert(blockInfoManager.get("block").get.readerCount === 0)
181+
}
182+
169183
test("read locks are reentrant") {
170184
withTaskId(1) {
171185
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))

0 commit comments

Comments
 (0)