Skip to content

Commit bbab0af

Browse files
cxzl25Mridul
authored and
Mridul
committed
[SPARK-40987][CORE] BlockManager#removeBlockInternal should ensure the lock is unlocked gracefully
### What changes were proposed in this pull request? `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully. `removeBlockInternal` tries to call `removeBlock` in the finally block. ### Why are the changes needed? When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`. `TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`. `BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block. `BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk. `DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time. `BlockInfoManager#blockInfoWrappers` block info and lock not removed. The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast. Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever. ``` 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX. 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast ``` ``` "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)     at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)     at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)     at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)     at scala.collection.Iterator.foreach(Iterator.scala:943)     at scala.collection.Iterator.foreach$(Iterator.scala:943)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)     at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)     at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)     at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)     at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)     at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)     at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)     at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)     at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Throw an exception before `Files.createDirectory` to simulate disk problems. DiskBlockManager#getFile ```java if (filename.contains("piece")) { throw new java.io.IOException("disk issue") } Files.createDirectory(path) ``` ``` ./bin/spark-shell ``` ```scala spark.sql("select 1").collect() ``` ``` 22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue. 22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue java.io.IOException: disk issue at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109) at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160) at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929) ``` Closes #38467 from cxzl25/SPARK-40987. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul <mridul<at>gmail.com>
1 parent f529d0e commit bbab0af

File tree

1 file changed

+26
-17
lines changed

1 file changed

+26
-17
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,23 +1991,32 @@ private[spark] class BlockManager(
19911991
* lock on the block.
19921992
*/
19931993
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
1994-
val blockStatus = if (tellMaster) {
1995-
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
1996-
Some(getCurrentBlockStatus(blockId, blockInfo))
1997-
} else None
1998-
1999-
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
2000-
val removedFromMemory = memoryStore.remove(blockId)
2001-
val removedFromDisk = diskStore.remove(blockId)
2002-
if (!removedFromMemory && !removedFromDisk) {
2003-
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
2004-
}
2005-
2006-
blockInfoManager.removeBlock(blockId)
2007-
if (tellMaster) {
2008-
// Only update storage level from the captured block status before deleting, so that
2009-
// memory size and disk size are being kept for calculating delta.
2010-
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
1994+
var hasRemoveBlock = false
1995+
try {
1996+
val blockStatus = if (tellMaster) {
1997+
val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
1998+
Some(getCurrentBlockStatus(blockId, blockInfo))
1999+
} else None
2000+
2001+
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
2002+
val removedFromMemory = memoryStore.remove(blockId)
2003+
val removedFromDisk = diskStore.remove(blockId)
2004+
if (!removedFromMemory && !removedFromDisk) {
2005+
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
2006+
}
2007+
2008+
blockInfoManager.removeBlock(blockId)
2009+
hasRemoveBlock = true
2010+
if (tellMaster) {
2011+
// Only update storage level from the captured block status before deleting, so that
2012+
// memory size and disk size are being kept for calculating delta.
2013+
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
2014+
}
2015+
} finally {
2016+
if (!hasRemoveBlock) {
2017+
logWarning(s"Block $blockId was not removed normally.")
2018+
blockInfoManager.removeBlock(blockId)
2019+
}
20112020
}
20122021
}
20132022

0 commit comments

Comments
 (0)