-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[WIP][SPARK-20629][CORE] Copy shuffle data when nodes are being shutdown #28331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
holdenk
wants to merge
32
commits into
apache:master
from
holdenk:SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-flat
Closed
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
4126c1b
Add support for migrating shuffle files
holdenk 8ee8949
Style fixes
holdenk afb1b1a
Re-enable the rest of the K8s tests.
holdenk 4071ae2
Python style fix
holdenk ff620ba
Don't join the block migration thread, we'll block on that
holdenk adb03db
We only need two executors for this test.
holdenk be2a5e7
Try and update the tests some more, switch migration to not make a ne…
holdenk 783114b
Code cleanups (swap some maps for foreach where we didn't need the re…
holdenk dbe2418
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-a…
holdenk a240f98
Add missing follow up commit from merge
holdenk ef8fcc5
Use NOOP_REDUCE_ID
holdenk 838a346
Config shorter interval for testing.
holdenk e85c8ef
Wait for the desired number of executors to be present.
holdenk 2da0f2d
Make a MigratableResolver interface so custom shuffle implementations…
holdenk 9d31746
Add in the trait I refactored the code to use (forgot the git add :p)
holdenk 38ff8be
Use block updates to make sure our desired blocks are being moved & a…
holdenk 13ec43a
Don't hardcode the blockId because it is not constant.
holdenk a92025c
Test both migrations at the same time & reduce some of the sleeps
holdenk fe265d7
Tag new APIs
holdenk 70c3871
Increase the number of execs and decrease the thread sleep time while…
holdenk 069dd3b
Update core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlock…
holdenk 6340f9b
Fix the migration to store ShuffleDataBlockId, check that data and in…
holdenk 4cb0458
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-a…
holdenk 4cfeb8e
We don't need the to set operations, also sleepyRdd isn't always slee…
holdenk e81aa5a
Use the remoteBlockSize param in the tests instead of conditioning on…
holdenk 841d443
Add a part of the test where we kill the original exec and recount. N…
holdenk ba20ec0
Add a part of the test where we kill the original exec and recount. N…
holdenk 17a6a3f
Fix the map output update logic which was getting tramped on (also th…
holdenk 155aeb2
Saw a test failure of the executors not coming up in time, bumping th…
holdenk 7f93df6
Bump the timeout, and also don't wait for the full set if we don't ne…
holdenk 7e32341
Return faster with shuffle blocks since we don't need the rest of the…
holdenk a3aa8eb
Small cleanups
holdenk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
47 changes: 47 additions & 0 deletions
47
core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.network.buffer.ManagedBuffer | ||
import org.apache.spark.network.client.StreamCallbackWithID | ||
import org.apache.spark.serializer.SerializerManager | ||
import org.apache.spark.storage.BlockId | ||
|
||
/** | ||
* :: Experimental :: | ||
* An experimental trait to allow Spark to migrate shuffle blocks. | ||
*/ | ||
@Experimental | ||
trait MigratableResolver { | ||
/** | ||
* Get the shuffle ids that are stored locally. Used for block migrations. | ||
*/ | ||
def getStoredShuffles(): Set[(Int, Long)] | ||
|
||
/** | ||
* Write a provided shuffle block as a stream. Used for block migrations. | ||
*/ | ||
def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): | ||
StreamCallbackWithID | ||
|
||
/** | ||
* Get the blocks for migration for a particular shuffle and map. | ||
*/ | ||
def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the file exists, does it mean there is index/data file with same shuffle id and map id? When it could happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this should never happen, I'm not sure though let me do some thinking on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this mirrors the logic inside of writeIndexFileAndCommit, the matching check there was introduced in SPARK-17547
which I believe is for the situation where an exception occurred during a previous write and the filesystem is in a dirty state. So I think we should keep it to be safe.