Skip to content

[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport) #15186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

JoshRosen
Copy link
Contributor

This patch is a branch-1.6 backport of #15037:

What changes were proposed in this pull request?

In Spark's RDD.getOrCompute we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there are remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a BlockFetchException that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the BlockManager.getRemoteBytes() method should never throw on remote fetch errors and, instead, should handle failures by returning None.

How was this patch tested?

Block manager changes should be covered by modified tests in BlockManagerSuite: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect None to be returned from the getRemote* method.

I also manually inspected all usages of BlockManager.getRemoteValues(), getRemoteBytes(), and get() to verify that they correctly pattern-match on the result and handle None. Note that these None branches are already exercised because the old getRemoteBytes returned None when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

…ng entire job

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`.

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15037 from JoshRosen/SPARK-17485.
@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #65735 has finished for PR 15186 at commit af88ada.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@srinathshankar srinathshankar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine. The original had 2 modified tests, I'm guessing one of them doesn't apply, right

@JoshRosen
Copy link
Contributor Author

Yep, 1.6 has fewer BlockManager tests.

@srinathshankar
Copy link
Contributor

Thanks for confirming, LGTM

@JoshRosen
Copy link
Contributor Author

I'm going to merge this into branch-1.6. Thanks!

asfgit pushed a commit that referenced this pull request Sep 22, 2016
…ng entire job (branch-1.6 backport)

This patch is a branch-1.6 backport of #15037:

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`.

## How was this patch tested?

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15186 from JoshRosen/SPARK-17485-branch-1.6-backport.
@JoshRosen JoshRosen closed this Sep 22, 2016
@JoshRosen JoshRosen deleted the SPARK-17485-branch-1.6-backport branch September 22, 2016 18:06
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Sep 23, 2016
…ng entire job (branch-1.6 backport)

This patch is a branch-1.6 backport of apache#15037:

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`.

## How was this patch tested?

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15186 from JoshRosen/SPARK-17485-branch-1.6-backport.

(cherry picked from commit 94524ce)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants