Skip to content

[SPARK-40987][CORE] BlockManager#removeBlockInternal should ensure the lock is unlocked gracefully #38467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Nov 1, 2022

What changes were proposed in this pull request?

BlockManager#removeBlockInternal should ensure the lock is unlocked gracefully.
removeBlockInternal tries to call removeBlock in the finally block.

Why are the changes needed?

When the driver submits a job, DAGScheduler calls sc.broadcast(taskBinaryBytes).
TorrentBroadcast#writeBlocks may fail due to disk problems during blockManager#putBytes.
BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the block.
BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on disk.
DiskStore#remove will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
BlockInfoManager#blockInfoWrappers block info and lock not removed.
The catch block in TorrentBroadcast#writeBlocks will call blockManager.removeBroadcast to clean up the broadcast.
Because the block lock in BlockInfoManager#blockInfoWrappers is not released, the dag-scheduler-event-loop thread of DAGScheduler will wait forever.

22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast 
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
    at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 

Does this PR introduce any user-facing change?

No

How was this patch tested?

Throw an exception before Files.createDirectory to simulate disk problems.

DiskBlockManager#getFile

if (filename.contains("piece")) {
  throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
./bin/spark-shell
spark.sql("select 1").collect()
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
	at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)

@LuciferYang
Copy link
Contributor

LuciferYang commented Nov 1, 2022

cc @Ngone51 @mridulm FYI

@LuciferYang
Copy link
Contributor

In my impression, I have seen a similar scenario. Due to disk problems (such as disk no space), the driver will hang and not exiting, also ping @yikf have you seen similar issue recently?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented Nov 2, 2022

If we are making this change, there are a few other places which are candidates for needCreate = false - can we include those as well ? (containsBlock, IndexShuffleBlockResolver.getChecksumFile)

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is mostly good.

Given the use of default value, it is not easy to find all cases where this might be relevant to do.

Let us remove the use of default value for needCreate, there are other usage where it becomes useful to pass needCreate = false - for example we should investigate use in FallbackStorage, BlockManager.extendMessageWithBlockDetails, etc.

Let us audit all usages and clean them up.

This will result in a large number of changes to Suite's - but should be fine.

@Ngone51
Copy link
Member

Ngone51 commented Nov 15, 2022

BlockInfoManager#blockInfoWrappers block info and lock not removed.

Can't we catch the exception from BlockManager#removeBlockInternal and release the lock when caught the execption?

@mridulm
Copy link
Contributor

mridulm commented Nov 21, 2022

Agree with @Ngone51, there are two issues here.

a) When we have locked for read/write, we expect it to be unlocked and exceptions to be handled gracefully.
In this case, removeBlockInternal should ensure the lock is unlocked gracefully.
Instead of catching Exception, I would suggest to moveremoveBlock into finally and everything above it in removeBlockInternal into a try block.

A quick look indicated the other uses of lockForWriting should be fine - but perhaps something we should audit in future @Ngone51 !

b) Ensure we do not recreate a directory when exit'ing (it is not limited to removeBlockInternal in this PR).

In addition to (a), I do believe we should do what is in this PR @Ngone51.
Thoughts ?

@Ngone51
Copy link
Member

Ngone51 commented Nov 25, 2022

Thanks @mridulm +1 for (a).

b) Ensure we do not recreate a directory when exit'ing (it is not limited to removeBlockInternal in this PR).

Agree too. But just the original change in this PR seems messy to me. I think we should find a better way to do this.

@Ngone51
Copy link
Member

Ngone51 commented Nov 25, 2022

@cxzl25 could you update the PR title/description to reflect the latest change? and create a separate JIRA ticket for (b)?

@cxzl25 cxzl25 changed the title [SPARK-40987][CORE] Avoid creating a directory when deleting a block, causing DAGScheduler to not work [SPARK-40987][CORE] BlockManager#removeBlockInternal should ensure the lock is unlocked gracefully Nov 25, 2022
@Ngone51
Copy link
Member

Ngone51 commented Nov 29, 2022

@mridulm could you help merge the PR? I have some issues with merging the PR :(

@asfgit asfgit closed this in bbab0af Nov 30, 2022
asfgit pushed a commit that referenced this pull request Nov 30, 2022
…the lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
`DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
    at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.

DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
  throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```

```
./bin/spark-shell
```
```scala
spark.sql("select 1").collect()
```

```
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
	at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
```

Closes #38467 from cxzl25/SPARK-40987.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
(cherry picked from commit bbab0af)
Signed-off-by: Mridul <mridulatgmail.com>
asfgit pushed a commit that referenced this pull request Nov 30, 2022
…the lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
`DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
    at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.

DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
  throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```

```
./bin/spark-shell
```
```scala
spark.sql("select 1").collect()
```

```
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
	at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
```

Closes #38467 from cxzl25/SPARK-40987.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
(cherry picked from commit bbab0af)
Signed-off-by: Mridul <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Nov 30, 2022

Merged to master, 3.3 and 3.2
Thanks for working on this @cxzl25 !
Thanks for the review @Ngone51 :-)

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…the lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
`DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" apache#54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
    at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.

DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
  throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```

```
./bin/spark-shell
```
```scala
spark.sql("select 1").collect()
```

```
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
	at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
```

Closes apache#38467 from cxzl25/SPARK-40987.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…the lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
`DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" apache#54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
    at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.

DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
  throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```

```
./bin/spark-shell
```
```scala
spark.sql("select 1").collect()
```

```
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
	at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
	at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
```

Closes apache#38467 from cxzl25/SPARK-40987.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
(cherry picked from commit bbab0af)
Signed-off-by: Mridul <mridulatgmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants