Skip to content

Commit af88ada

Browse files
committed
[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job
In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring). In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable. Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`. Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method. I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master). Author: Josh Rosen <joshrosen@databricks.com> Closes #15037 from JoshRosen/SPARK-17485.
1 parent ce0a222 commit af88ada

File tree

3 files changed

+5
-29
lines changed

3 files changed

+5
-29
lines changed

core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala

Lines changed: 0 additions & 24 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,8 +602,9 @@ private[spark] class BlockManager(
602602
numFetchFailures += 1
603603
if (numFetchFailures == locations.size) {
604604
// An exception is thrown while fetching this block from all locations
605-
throw new BlockFetchException(s"Failed to fetch block from" +
605+
logWarning(s"Failed to fetch block from" +
606606
s" ${locations.size} locations. Most recent failure cause:", e)
607+
return None
607608
} else {
608609
// This location failed, so we retry fetch from a different one by returning null here
609610
logWarning(s"Failed to fetch remote block $blockId " +

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -509,10 +509,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
509509
assert(list1Get.isDefined, "list1Get expected to be fetched")
510510
store3.stop()
511511
store3 = null
512-
// exception throw because there is no locations
513-
intercept[BlockFetchException] {
514-
list1Get = store.getRemoteBytes("list1")
515-
}
512+
// Fetch should fail because there are no locations, but no exception should be thrown
513+
list1Get = store.getRemoteBytes("list1")
514+
assert(list1Get.isEmpty, "list1Get expected to fail")
516515
} finally {
517516
origTimeoutOpt match {
518517
case Some(t) => conf.set("spark.network.timeout", t)

0 commit comments

Comments
 (0)