-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically restart P2P shuffles that were aborted due to leaving workers #7353
Comments
FWIW I believe 3.) would also simplify implementation of #7324 |
A proposal for 2): #7372 (possibly incomplete) |
For the sake of completeness, I believe 2.) and 3.) would also be achieved if the remove_worker hook in the scheduler extension could somehow wait for all workers to confirm that the threadpool is idle // all transfer tasks were released. |
Regarding 1.): When implementing, we need to be aware of the lag between
I don't think I agree, but it would demote the problem to a fringe edge case that should not occur in the wild given the current implementation of the system. |
IIUC you are talking about the case where a failure is reported before the scheduler is aware of the failing worker.
We could do deduplication by |
The case I am talking about is when the scheduler starts taking action in distributed/distributed/scheduler.py Lines 4804 to 4844 in 53284cd
but the SchedulerShuffleExtension has not yet been called indistributed/distributed/scheduler.py Lines 4846 to 4852 in 53284cd
A similar case holds for relying on transition hooks as you already describe for 2.):
Fair point. |
@hendrikmakait I believe this issue can be closed, can't it? Is there something missing? |
We still lack the entire mechanism for automatic restarts. For now, we merely fail reliably. |
With #7326 we guarantee that a shuffle consistently fails if a participating worker dies (input-only workers will also trigger this since we currently do not guarantee exactly-once guarantees, see #7324). This is currently a hard requirement since input tasks split their data by output partition and push these shards to the designated output workers.
If one of the output worker dies we loose the data that was already pushed to it. The tasks assigned to the dead worker at the time are not representative of the lost data and it is not sufficient to reschedule the lost tasks but we need to reschedule all of them.
Whenever a shuffle is rescheduled, we will generate new metadata on the scheduler side with a newly calculated output worker mapping. We will increase the
attempt
orgeneration
counter on this metadata to distinguish this run from the earlier one. All transfer tasks need to be rescheduled and will need to execute at least once using the new metadata.Rescheduling is not an atomic operation and we need to assume that old generation tasks are still running or are about to run while we are resetting the metadata on the scheduler, are broadcasting it to all workers and are transitioning the tasks.
Neither scheduler, nor workers do have a notion of 'attempts' or 'generations'. Therefore any finished task from an earlier generation, both failure (all tasks) and success (transfer+barrier), will corrupt an ongoing shuffle unless we can detect old-generation task-finished responses and handle them accordingly.
Summarized, we need to
Iff we do not guaranteed deduplication on receiver end (#7324) we further need to ensure that shards of an earlier generation must not be accepted by
Shuffle
instances of a newer generation. Otherwise, if the output_worker mappings between old and new generation map some output partitions to the same worker, old generation transfer tasks may already send some shards to the new generation receivers. Once this task is finished and rescheduled, this would effectively cause data duplication.Apart from #7324 we need weaker guarentees to avoid this and it is sufficient if we reject transfers that do not belong to the same generation.
1.) is trivially achieved by simply releasing all transfer tasks and the barrier task.
Note that not necessarily all unpack tasks will automatically be rescheduled. If the unpack tasks were already released beforehand and the dependents are still in memory, they will not be recomputed. This will affect some internal logic that counts how many output tasks where already processed to determine whether a shuffle is "done".
2.) The only possibility to entirely ignore old generation results is to expand the scheduler transition engine with this concept. Every Scheduler->Worker compute-task message will sign this request with a unique identifier (could be a simple counter / the stimulus id / etc.) and remembers the most recent identifier. The Worker will store this unique identifier in a TaskState attribute and will return it again in it's task-finished message. The scheduler handlers can then properly deduplicate the requests by ignoring stale responses.
This concept has already been successfully implemented in work stealing.
This would be the first modification of the actual scheduler, outside of an extension to enable P2P. I believe this additional guarantee would not be harmful and may even avoid some fringe races that are currently dealt with by sophisticated transitions.
Note: A mere transition hook is not sufficient since a task failure would already generate a client message indicating a failure. the transition hooks do not allow us to intercept messages to clients (nor workers) and I don't think this would be a table API
3.) If the replicated shuffle metadata includes an attempt/generation counter which is included in the shards submission. the receiving end will ignore all pushes that do not match it's own. If the sender is old generation, the sender task can simply err/finish (pending 1.)). If the sender is new generation it needs to repeat until the receiving end is updated.
cc @hendrikmakait @mrocklin
The text was updated successfully, but these errors were encountered: