Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically restart P2P shuffles that were aborted due to leaving workers #7353

Closed
fjetter opened this issue Nov 25, 2022 · 8 comments · Fixed by #7970
Closed

Automatically restart P2P shuffles that were aborted due to leaving workers #7353

fjetter opened this issue Nov 25, 2022 · 8 comments · Fixed by #7970

Comments

@fjetter
Copy link
Member

fjetter commented Nov 25, 2022

With #7326 we guarantee that a shuffle consistently fails if a participating worker dies (input-only workers will also trigger this since we currently do not guarantee exactly-once guarantees, see #7324). This is currently a hard requirement since input tasks split their data by output partition and push these shards to the designated output workers.
If one of the output worker dies we loose the data that was already pushed to it. The tasks assigned to the dead worker at the time are not representative of the lost data and it is not sufficient to reschedule the lost tasks but we need to reschedule all of them.

Whenever a shuffle is rescheduled, we will generate new metadata on the scheduler side with a newly calculated output worker mapping. We will increase the attempt or generation counter on this metadata to distinguish this run from the earlier one. All transfer tasks need to be rescheduled and will need to execute at least once using the new metadata.

Rescheduling is not an atomic operation and we need to assume that old generation tasks are still running or are about to run while we are resetting the metadata on the scheduler, are broadcasting it to all workers and are transitioning the tasks.
Neither scheduler, nor workers do have a notion of 'attempts' or 'generations'. Therefore any finished task from an earlier generation, both failure (all tasks) and success (transfer+barrier), will corrupt an ongoing shuffle unless we can detect old-generation task-finished responses and handle them accordingly.

Summarized, we need to

  1. Ensure that every task of a given generation is executed at least once
  2. Suppress any finished tasks from an earlier generation

Iff we do not guaranteed deduplication on receiver end (#7324) we further need to ensure that shards of an earlier generation must not be accepted by Shuffle instances of a newer generation. Otherwise, if the output_worker mappings between old and new generation map some output partitions to the same worker, old generation transfer tasks may already send some shards to the new generation receivers. Once this task is finished and rescheduled, this would effectively cause data duplication.
Apart from #7324 we need weaker guarentees to avoid this and it is sufficient if we reject transfers that do not belong to the same generation.

  1. Ensure transfers of an earlier generation do not contribute to the new generation, i.e. new generation receivers must reject old generation tranfers

1.) is trivially achieved by simply releasing all transfer tasks and the barrier task.
Note that not necessarily all unpack tasks will automatically be rescheduled. If the unpack tasks were already released beforehand and the dependents are still in memory, they will not be recomputed. This will affect some internal logic that counts how many output tasks where already processed to determine whether a shuffle is "done".

2.) The only possibility to entirely ignore old generation results is to expand the scheduler transition engine with this concept. Every Scheduler->Worker compute-task message will sign this request with a unique identifier (could be a simple counter / the stimulus id / etc.) and remembers the most recent identifier. The Worker will store this unique identifier in a TaskState attribute and will return it again in it's task-finished message. The scheduler handlers can then properly deduplicate the requests by ignoring stale responses.
This concept has already been successfully implemented in work stealing.
This would be the first modification of the actual scheduler, outside of an extension to enable P2P. I believe this additional guarantee would not be harmful and may even avoid some fringe races that are currently dealt with by sophisticated transitions.

Note: A mere transition hook is not sufficient since a task failure would already generate a client message indicating a failure. the transition hooks do not allow us to intercept messages to clients (nor workers) and I don't think this would be a table API

3.) If the replicated shuffle metadata includes an attempt/generation counter which is included in the shards submission. the receiving end will ignore all pushes that do not match it's own. If the sender is old generation, the sender task can simply err/finish (pending 1.)). If the sender is new generation it needs to repeat until the receiving end is updated.

cc @hendrikmakait @mrocklin

@fjetter
Copy link
Member Author

fjetter commented Dec 6, 2022

FWIW I believe 3.) would also simplify implementation of #7324

@fjetter
Copy link
Member Author

fjetter commented Dec 6, 2022

A proposal for 2): #7372 (possibly incomplete)

@fjetter
Copy link
Member Author

fjetter commented Dec 6, 2022

For the sake of completeness, I believe 2.) and 3.) would also be achieved if the remove_worker hook in the scheduler extension could somehow wait for all workers to confirm that the threadpool is idle // all transfer tasks were released.
This is in practice difficult and I am a bit concerned about "blocking" in remove_worker on some non-trivial logic, particularly one that may be called multiple times in short succession (e.g. 5 workers failed simultaneously because a host failed)

@hendrikmakait
Copy link
Member

Regarding 1.):

When implementing, we need to be aware of the lag between Scheduler.remove_worker transitioning tasks previously running on the leaving worker and the execution of the ShuffleSchedulerExtension.remove_worker hook.

Scheduler.remove_worker transitions tasks to released or erred before we reach the plugin hook. Rescheduling tasks without updating the metadata may cause them to fail. If the tasks transition to erred before we release them in the plugin hook, we will send client messages. (A possible solution may be #5403.)

FWIW I believe 3.) would also simplify implementation of #7324

I don't think I agree, but it would demote the problem to a fringe edge case that should not occur in the wild given the current implementation of the system.

@fjetter
Copy link
Member Author

fjetter commented Dec 8, 2022

Scheduler.remove_worker transitions tasks to released or erred before we reach the plugin hook. Rescheduling tasks without updating the metadata may cause them to fail. If the tasks transition to erred before we release them in the plugin hook, we will send client messages. (A possible solution may be #5403.)

IIUC you are talking about the case where a failure is reported before the scheduler is aware of the failing worker.

I don't think I agree, but it would demote the problem to a fringe edge case that should not occur in the wild given the current implementation of the system.

We could do deduplication by (SourceID, attempt). Right now, we'd need to deduplicate by (SourceID, TargetID) but all TargetIDs are currently meshed up in the same bytestring.
Off topic, of course.

@hendrikmakait
Copy link
Member

IIUC you are talking about the case where a failure is reported before the scheduler is aware of the failing worker.

The case I am talking about is when the scheduler starts taking action in Scheduler.remove_worker, i.e., transitioning all tasks in

ts: TaskState
for ts in list(ws.processing):
k = ts.key
recommendations[k] = "released"
if not safe:
ts.suspicious += 1
ts.prefix.suspicious += 1
if ts.suspicious > self.allowed_failures:
del recommendations[k]
e = pickle.dumps(
KilledWorker(
task=k,
last_worker=ws.clean(),
allowed_failures=self.allowed_failures,
),
)
r = self.transition(
k,
"erred",
exception=e,
cause=k,
stimulus_id=stimulus_id,
worker=address,
)
recommendations.update(r)
logger.info(
"Task %s marked as failed because %d workers died"
" while trying to run it",
ts.key,
self.allowed_failures,
)
for ts in list(ws.has_what):
self.remove_replica(ts, ws)
if not ts.who_has:
if ts.run_spec:
recommendations[ts.key] = "released"
else: # pure data
recommendations[ts.key] = "forgotten"
self.transitions(recommendations, stimulus_id=stimulus_id)

but the SchedulerShuffleExtension has not yet been called in
for plugin in list(self.plugins.values()):
try:
result = plugin.remove_worker(scheduler=self, worker=address)
if inspect.isawaitable(result):
await result
except Exception as e:
logger.exception(e)

A similar case holds for relying on transition hooks as you already describe for 2.):

Note: A mere transition hook is not sufficient since a task failure would already generate a client message indicating a failure. the transition hooks do not allow us to intercept messages to clients (nor workers) and I don't think this would be a table API

We could do deduplication by (SourceID, attempt). Right now, we'd need to deduplicate by (SourceID, TargetID) but all TargetIDs are currently meshed up in the same bytestring.

Fair point.

@fjetter
Copy link
Member Author

fjetter commented Mar 1, 2023

@hendrikmakait I believe this issue can be closed, can't it? Is there something missing?

@hendrikmakait
Copy link
Member

We still lack the entire mechanism for automatic restarts. For now, we merely fail reliably.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants