Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically restart P2P shuffles when output worker leaves #7970

Merged
merged 88 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
6658a65
Adjust tests
hendrikmakait Jul 5, 2023
4e7b425
Restart shuffle
hendrikmakait Jul 5, 2023
1cfa0d3
Remove erred_shuffles
hendrikmakait Jul 5, 2023
fe79aa3
Fix failures on unpack
hendrikmakait Jul 6, 2023
853d953
transitions
hendrikmakait Jul 6, 2023
e058234
Revert "transitions"
hendrikmakait Jul 6, 2023
8510c06
Adjust tests
hendrikmakait Jul 6, 2023
6138fc6
Recommendations
hendrikmakait Jul 6, 2023
0e7b051
stimulus_id in remove_worker and transition
hendrikmakait Jul 6, 2023
8e92b85
Use stimulus_id as transaction
hendrikmakait Jul 6, 2023
3be333e
Tests
hendrikmakait Jul 6, 2023
8412d2c
to kwarg
hendrikmakait Jul 6, 2023
ba67405
kwarg
hendrikmakait Jul 6, 2023
9d804dc
warning
hendrikmakait Jul 6, 2023
0fd66cd
forgiveness
hendrikmakait Jul 6, 2023
4f456e9
forgiveness
hendrikmakait Jul 6, 2023
8f02f8d
forgiveness
hendrikmakait Jul 6, 2023
29405c6
Minor
hendrikmakait Jul 7, 2023
476f132
Move warning
hendrikmakait Jul 7, 2023
d34917e
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 7, 2023
69f38f8
Minor
hendrikmakait Jul 7, 2023
337dc72
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 7, 2023
2a18033
Closing
hendrikmakait Jul 7, 2023
85c7ab2
Improved error messages
hendrikmakait Jul 10, 2023
263a75b
Adjust tests
hendrikmakait Jul 10, 2023
266732e
Fix
hendrikmakait Jul 10, 2023
8fc28eb
Improved
hendrikmakait Jul 10, 2023
fef0a94
Fix
hendrikmakait Jul 10, 2023
2d2923f
Minor
hendrikmakait Jul 10, 2023
9936857
Merge branch 'stimuli-in-plugin-hooks' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
fd00098
Minor
hendrikmakait Jul 10, 2023
1b7d055
Merge branch 'improved-p2p-errors' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
88f11a4
Apply suggestions from code review
hendrikmakait Jul 10, 2023
a70d1ec
Suggestions from code review
hendrikmakait Jul 10, 2023
976ddb3
Merge branch 'improved-p2p-errors' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
559075b
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
c70b4cb
Rename and use teardown
hendrikmakait Jul 10, 2023
9790360
Renaming
hendrikmakait Jul 10, 2023
25a5cc3
Test shutdown
hendrikmakait Jul 10, 2023
593fa02
Check for teardown
hendrikmakait Jul 10, 2023
f4b5f1c
Fix tests
hendrikmakait Jul 10, 2023
e0d0174
Fix tests
hendrikmakait Jul 10, 2023
ff711e2
Merge branch 'shuffle-shutdown' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
31929ab
Fix
hendrikmakait Jul 10, 2023
f83bc40
Fix check
hendrikmakait Jul 10, 2023
6059f91
Merge branch 'shuffle-shutdown' into restart-p2p-shuffle
hendrikmakait Jul 10, 2023
dca742e
Default timeout on tests
hendrikmakait Jul 11, 2023
47eacfc
Ensure that worker is running before letting it participate
hendrikmakait Jul 11, 2023
f544611
Notify in more places
hendrikmakait Jul 11, 2023
a357e38
Avoid problems with queueing
hendrikmakait Jul 12, 2023
34062b1
Cleanup
hendrikmakait Jul 13, 2023
bbfa1b9
Code review
hendrikmakait Jul 13, 2023
10126ff
kwargs in remove_worker
hendrikmakait Jul 13, 2023
1482153
Merge branch 'main' into stimuli-in-plugin-hooks
hendrikmakait Jul 13, 2023
36143fe
Merge branch 'stimuli-in-plugin-hooks' into restart-p2p-shuffle
hendrikmakait Jul 13, 2023
ed69caf
Fix cleanup
hendrikmakait Jul 13, 2023
bd74bf2
Fix test
hendrikmakait Jul 13, 2023
74edf2e
Scheduler state cleanup
hendrikmakait Jul 13, 2023
391042e
Revert change
hendrikmakait Jul 13, 2023
788f8a4
Improve get_or_create
hendrikmakait Jul 13, 2023
c51e39e
Prohibit non-processing tasks from creating a shuffle state
hendrikmakait Jul 13, 2023
cf2ac3c
Minor
hendrikmakait Jul 13, 2023
6b999b0
Remove superfluous test
hendrikmakait Jul 13, 2023
396f1c2
Cleanup
hendrikmakait Jul 13, 2023
5070674
Additional tests
hendrikmakait Jul 13, 2023
938eab1
no cover
hendrikmakait Jul 14, 2023
6cfe848
docstrings
hendrikmakait Jul 17, 2023
6c1be53
Update distributed/shuffle/_scheduler_plugin.py
hendrikmakait Jul 18, 2023
f904377
Dataclass init
hendrikmakait Jul 18, 2023
160d055
Update distributed/shuffle/_scheduler_plugin.py
hendrikmakait Jul 18, 2023
71778e6
Review feedback
hendrikmakait Jul 18, 2023
00df2e8
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 19, 2023
f1ccd63
Remove notify_all
hendrikmakait Jul 19, 2023
3ac439c
Remove __eq__
hendrikmakait Jul 20, 2023
d72596b
walrussin'
hendrikmakait Jul 20, 2023
f14c71b
Condition
hendrikmakait Jul 20, 2023
90eb9ea
Docstring
hendrikmakait Jul 20, 2023
4b4ab50
cond
hendrikmakait Jul 20, 2023
dd7ac3e
Simplify restart
hendrikmakait Jul 20, 2023
9459019
raise if invariant broken
hendrikmakait Jul 20, 2023
3213ef5
Additional test
hendrikmakait Jul 20, 2023
e4be94f
Fix race
hendrikmakait Jul 21, 2023
c1ef0be
Fix test
hendrikmakait Jul 21, 2023
434d5a3
No reproducer
hendrikmakait Jul 21, 2023
1f0ea84
doc and ignore
hendrikmakait Jul 21, 2023
9e7c2e8
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 24, 2023
36a2c94
Merge branch 'main' into restart-p2p-shuffle
hendrikmakait Jul 24, 2023
5764f45
[skip-caching]
hendrikmakait Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions distributed/shuffle/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations


class ShuffleClosedError(RuntimeError):
pass
157 changes: 109 additions & 48 deletions distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from collections import defaultdict
from collections.abc import Callable, Iterable, Sequence
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import partial
from itertools import product
from typing import TYPE_CHECKING, Any, ClassVar
Expand Down Expand Up @@ -34,14 +34,15 @@
logger = logging.getLogger(__name__)


@dataclass
@dataclass(eq=False)
class ShuffleState(abc.ABC):
_run_id_iterator: ClassVar[itertools.count] = itertools.count(1)

id: ShuffleId
run_id: int
output_workers: set[str]
participating_workers: set[str]
_archived_by: str | None = field(default=None, init=False)

@abc.abstractmethod
def to_msg(self) -> dict[str, Any]:
Expand All @@ -50,8 +51,11 @@ def to_msg(self) -> dict[str, Any]:
def __str__(self) -> str:
return f"{self.__class__.__name__}<{self.id}[{self.run_id}]>"

def __hash__(self) -> int:
return hash(self.run_id)

@dataclass

@dataclass(eq=False)
class DataFrameShuffleState(ShuffleState):
type: ClassVar[ShuffleType] = ShuffleType.DATAFRAME
worker_for: dict[int, str]
Expand All @@ -68,7 +72,7 @@ def to_msg(self) -> dict[str, Any]:
}


@dataclass
@dataclass(eq=False)
class ArrayRechunkState(ShuffleState):
type: ClassVar[ShuffleType] = ShuffleType.ARRAY_RECHUNK
worker_for: dict[NDIndex, str]
Expand All @@ -90,19 +94,18 @@ def to_msg(self) -> dict[str, Any]:
class ShuffleSchedulerPlugin(SchedulerPlugin):
"""
Shuffle plugin for the scheduler

This coordinates the individual worker plugins to ensure correctness
and collects heartbeat messages for the dashboard.

See Also
--------
ShuffleWorkerPlugin
"""

scheduler: Scheduler
states: dict[ShuffleId, ShuffleState]
active_shuffles: dict[ShuffleId, ShuffleState]
heartbeats: defaultdict[ShuffleId, dict]
erred_shuffles: dict[ShuffleId, Exception]
_shuffles: defaultdict[ShuffleId, set[ShuffleState]]
_archived_by_stimulus: defaultdict[str, set[ShuffleState]]
Comment on lines +105 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that active_shuffles, _shuffles and _archived_by_stimulus are always required to be in sync with one-another. Is there a way to build a datastructure that enforces the relevant invariants? Rather than having to maintain all these separate mappings and remember to update them all correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm certain there is, but I would like to keep the refactoring separate from actual logic changes. Let's keep this for a follow-up PR? (Would you be interested in taking that on?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please open an issue and assign to me, and I will give it a go, thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do once this is merged 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def __init__(self, scheduler: Scheduler):
self.scheduler = scheduler
Expand All @@ -115,9 +118,10 @@ def __init__(self, scheduler: Scheduler):
}
)
self.heartbeats = defaultdict(lambda: defaultdict(dict))
self.states = {}
self.erred_shuffles = {}
self.active_shuffles = {}
self.scheduler.add_plugin(self, name="shuffle")
self._shuffles = defaultdict(set)
self._archived_by_stimulus = defaultdict(set)

async def start(self, scheduler: Scheduler) -> None:
worker_plugin = ShuffleWorkerPlugin()
Expand All @@ -126,18 +130,19 @@ async def start(self, scheduler: Scheduler) -> None:
)

def shuffle_ids(self) -> set[ShuffleId]:
return set(self.states)
return set(self.active_shuffles)

async def barrier(self, id: ShuffleId, run_id: int) -> None:
shuffle = self.states[id]
shuffle = self.active_shuffles[id]
assert shuffle.run_id == run_id, f"{run_id=} does not match {shuffle}"
msg = {"op": "shuffle_inputs_done", "shuffle_id": id, "run_id": run_id}
await self.scheduler.broadcast(
msg=msg, workers=list(shuffle.participating_workers)
msg=msg,
workers=list(shuffle.participating_workers),
)

def restrict_task(self, id: ShuffleId, run_id: int, key: str, worker: str) -> dict:
shuffle = self.states[id]
shuffle = self.active_shuffles[id]
if shuffle.run_id > run_id:
return {
"status": "error",
Expand All @@ -158,15 +163,19 @@ def heartbeat(self, ws: WorkerState, data: dict) -> None:
self.heartbeats[shuffle_id][ws.address].update(d)

def get(self, id: ShuffleId, worker: str) -> dict[str, Any]:
if exception := self.erred_shuffles.get(id):
return {"status": "error", "message": str(exception)}
state = self.states[id]
if worker not in self.scheduler.workers:
# This should never happen
raise RuntimeError(
f"Scheduler is unaware of this worker {worker!r}"
) # pragma: nocover
state = self.active_shuffles[id]
state.participating_workers.add(worker)
return state.to_msg()

def get_or_create(
self,
id: ShuffleId,
key: str,
type: str,
worker: str,
spec: dict[str, Any],
Expand All @@ -178,6 +187,7 @@ def get_or_create(
# known by its name. If the name has been mangled, we cannot guarantee
# that the shuffle works as intended and should fail instead.
self._raise_if_barrier_unknown(id)
self._raise_if_task_not_processing(key)

state: ShuffleState
if type == ShuffleType.DATAFRAME:
Expand All @@ -186,7 +196,8 @@ def get_or_create(
state = self._create_array_rechunk_state(id, spec)
else: # pragma: no cover
raise TypeError(type)
self.states[id] = state
self.active_shuffles[id] = state
self._shuffles[id].add(state)
state.participating_workers.add(worker)
return state.to_msg()

Expand All @@ -201,6 +212,11 @@ def _raise_if_barrier_unknown(self, id: ShuffleId) -> None:
"into this by leaving a comment at distributed#7816."
)

def _raise_if_task_not_processing(self, key: str) -> None:
task = self.scheduler.tasks[key]
if task.state != "processing":
raise RuntimeError(f"Expected {task} to be processing, is {task.state}.")

def _create_dataframe_shuffle_state(
self, id: ShuffleId, spec: dict[str, Any]
) -> DataFrameShuffleState:
Expand Down Expand Up @@ -309,34 +325,67 @@ def _unset_restriction(self, ts: TaskState) -> None:
original_restrictions = ts.annotations.pop("shuffle_original_restrictions")
self.scheduler.set_restrictions({ts.key: original_restrictions})

def _restart_recommendations(self, id: ShuffleId) -> Recs:
barrier_task = self.scheduler.tasks[barrier_key(id)]
recs: Recs = {}

for dt in barrier_task.dependents:
if dt.state == "erred":
return {}
recs.update({dt.key: "released"})

if barrier_task.state == "erred":
# This should never happen, a dependent of the barrier should already
# be `erred`
raise RuntimeError(
f"Expected dependents of {barrier_task=} to be 'erred' if "
"the barrier is."
) # pragma: no cover
recs.update({barrier_task.key: "released"})

for dt in barrier_task.dependencies:
if dt.state == "erred":
# This should never happen, a dependent of the barrier should already
# be `erred`
raise RuntimeError(
f"Expected barrier and its dependents to be "
f"'erred' if the barrier's dependency {dt} is."
) # pragma: no cover
recs.update({dt.key: "released"})
return recs

def _restart_shuffle(
self, id: ShuffleId, scheduler: Scheduler, *, stimulus_id: str
) -> None:
recs = self._restart_recommendations(id)
self.scheduler.transitions(recs, stimulus_id=stimulus_id)
self.scheduler.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)

def remove_worker(
self, scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any
) -> None:
from time import time

stimulus_id = f"shuffle-failed-worker-left-{time()}"
"""Restart all active shuffles when a participating worker leaves the cluster.

.. note::
Due to the order of operations in :meth:`~Scheduler.remove_worker`, the
shuffle may have already been archived by
:meth:`~ShuffleSchedulerPlugin.transition`. In this case, the
``stimulus_id`` is used as a transaction identifier and all archived shuffles
with a matching `stimulus_id` are restarted.
"""

recs: Recs = {}
for shuffle_id, shuffle in self.states.items():
# If processing the transactions causes a task to get released, this
# removes the shuffle from self.active_shuffles. Therefore, we must iterate
# over a copy.
for shuffle_id, shuffle in self.active_shuffles.copy().items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we iterate over all active shuffles, remove and restart?

Why do we not unconditionally restart the archived shuffles after this loop over active shuffles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure I'm following, but what I think you're saying is a very good point.

if worker not in shuffle.participating_workers:
continue
exception = RuntimeError(f"Worker {worker} left during active {shuffle}")
self.erred_shuffles[shuffle_id] = exception
self._fail_on_workers(shuffle, str(exception))
self._clean_on_scheduler(shuffle_id, stimulus_id)

barrier_task = self.scheduler.tasks[barrier_key(shuffle_id)]
if barrier_task.state == "memory":
for dt in barrier_task.dependents:
if worker not in dt.worker_restrictions:
continue
self._unset_restriction(dt)
recs.update({dt.key: "waiting"})
# TODO: Do we need to handle other states?

# If processing the transactions causes a task to get released, this
# removes the shuffle from self.states. Therefore, we must process them
# outside of the loop.
self.scheduler.transitions(recs, stimulus_id=stimulus_id)
for shuffle in self._archived_by_stimulus.get(stimulus_id, set()):
self._restart_shuffle(shuffle.id, scheduler, stimulus_id=stimulus_id)

def transition(
self,
Expand All @@ -347,17 +396,25 @@ def transition(
stimulus_id: str,
**kwargs: Any,
) -> None:
"""Clean up scheduler and worker state once a shuffle becomes inactive."""
if finish not in ("released", "forgotten"):
return
if not key.startswith("shuffle-barrier-"):
return
shuffle_id = id_from_key(key)
try:
shuffle = self.states[shuffle_id]
except KeyError:
return
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id)

if shuffle := self.active_shuffles.get(shuffle_id):
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)

if finish == "forgotten":
shuffles = self._shuffles.pop(shuffle_id, set())
for shuffle in shuffles:
if shuffle._archived_by:
archived = self._archived_by_stimulus[shuffle._archived_by]
archived.remove(shuffle)
if not archived:
del self._archived_by_stimulus[shuffle._archived_by]
Comment on lines +416 to +417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this (now empty) set from the dict? (AKA, why is self._archived_by_stimulus[shuffle._archived_by].remove(shuffle) not sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a long-running cluster, we would keep adding new entries to the dictionary and unnecessarily increase its size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is _archived_by not a finite set of values (strings)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Individual stimulus_ids should be unique (think of them as a poor man's version of a span ID in tracing). They're usually created by combining a stimulus name (from a finite set of names) with a timestamp. For example:

await self.remove_worker(
worker, stimulus_id=f"handle-worker-cleanup-{time()}"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok.


def _fail_on_workers(self, shuffle: ShuffleState, message: str) -> None:
worker_msgs = {
Expand All @@ -373,9 +430,12 @@ def _fail_on_workers(self, shuffle: ShuffleState, message: str) -> None:
}
self.scheduler.send_all({}, worker_msgs)

def _clean_on_scheduler(self, id: ShuffleId) -> None:
del self.states[id]
self.erred_shuffles.pop(id, None)
def _clean_on_scheduler(self, id: ShuffleId, stimulus_id: str | None) -> None:
shuffle = self.active_shuffles.pop(id)
if not shuffle._archived_by and stimulus_id:
shuffle._archived_by = stimulus_id
self._archived_by_stimulus[stimulus_id].add(shuffle)

with contextlib.suppress(KeyError):
del self.heartbeats[id]

Expand All @@ -384,9 +444,10 @@ def _clean_on_scheduler(self, id: ShuffleId) -> None:
self._unset_restriction(dt)

def restart(self, scheduler: Scheduler) -> None:
self.states.clear()
self.active_shuffles.clear()
self.heartbeats.clear()
self.erred_shuffles.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object now has more state than previously sitting around inside it, is it deliberate that (say) _archived_by_stimulus is not cleaned out here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, addressed.

self._shuffles.clear()
self._archived_by_stimulus.clear()


def get_worker_for_range_sharding(
Expand Down
5 changes: 5 additions & 0 deletions distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from distributed.exceptions import Reschedule
from distributed.shuffle._arrow import check_dtype_support, check_minimal_arrow_version
from distributed.shuffle._exceptions import ShuffleClosedError

logger = logging.getLogger("distributed.shuffle")
if TYPE_CHECKING:
Expand Down Expand Up @@ -69,6 +70,8 @@ def shuffle_transfer(
column=column,
parts_out=parts_out,
)
except ShuffleClosedError:
raise Reschedule()
except Exception as e:
raise RuntimeError(f"shuffle_transfer failed during shuffle {id}") from e

Expand All @@ -82,6 +85,8 @@ def shuffle_unpack(
)
except Reschedule as e:
raise e
except ShuffleClosedError:
raise Reschedule()
Comment on lines +88 to +89
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the easiest way of dealing with race conditions between a worker closing down, thus raising the ShuffleClosedError, and the scheduler recognizing the worker closing and thus refusing the erred task. This can lead to the task ending up in a rescheduling loop until the scheduler recognized the worker shutting down, but at that point the remove_worker logic should take over and fail the shuffle run on all workers and remove the task from processing.

except Exception as e:
raise RuntimeError(f"shuffle_unpack failed during shuffle {id}") from e

Expand Down
Loading