Skip to content

Document Scheduler and Worker state machine #6948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,10 @@ class TaskState:

#: The current state of the task
state: TaskStateState = "released"
#: The previous state of the task. It is not None iff state in (cancelled, resumed).
#: The previous state of the task. It is not None iff :attr:`state` in
#: (cancelled, resumed).
previous: Literal["executing", "long-running", "flight", None] = None
#: The next state of the task. It is not None iff state == resumed.
#: The next state of the task. It is not None iff :attr:`state` == resumed.
next: Literal["fetch", "waiting", None] = None

#: Expected duration of the task
Expand Down Expand Up @@ -278,7 +279,9 @@ class TaskState:
nbytes: int | None = None
#: Arbitrary task annotations
annotations: dict | None = None
#: True if the task is in memory or erred; False otherwise
#: True if the :meth:`~WorkerBase.execute` or :meth:`~WorkerBase.gather_dep`
#: coroutine servicing this task completed; False otherwise. This flag changes
#: the behaviour of transitions out of the ``executing``, ``flight`` etc. states.
done: bool = False

_instances: ClassVar[weakref.WeakSet[TaskState]] = weakref.WeakSet()
Expand Down Expand Up @@ -563,7 +566,10 @@ class StealResponseMsg(SendMessageToScheduler):

@dataclass
class StateMachineEvent:
"""Base abstract class for all stimuli that can modify the worker state"""

__slots__ = ("stimulus_id", "handled")
#: Unique ID of the event
stimulus_id: str
#: timestamp of when the event was handled by the worker
# TODO Switch to @dataclass(slots=True), uncomment the line below, and remove the
Expand All @@ -572,6 +578,7 @@ class StateMachineEvent:
_classes: ClassVar[dict[str, type[StateMachineEvent]]] = {}

def __new__(cls, *args: Any, **kwargs: Any) -> StateMachineEvent:
"""Hack to initialize the ``handled`` attribute in Python <3.10"""
self = object.__new__(cls)
self.handled = None
return self
Expand Down Expand Up @@ -1127,11 +1134,11 @@ class WorkerState:
#: All and only tasks with ``TaskState.state == 'missing'``.
missing_dep_flight: set[TaskState]

#: Which tasks that are coming to us in current peer-to-peer connections.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'flight'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'flight`
#: Tasks that are coming to us in current peer-to-peer connections.
#:
#: This set includes exclusively tasks with :attr:`~TaskState.state` == 'flight' as
#: well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'flight`.
#:
#: See also :meth:`in_flight_tasks_count`.
in_flight_tasks: set[TaskState]
Expand Down Expand Up @@ -1176,10 +1183,10 @@ class WorkerState:
available_resources: dict[str, float]

#: Set of tasks that are currently running.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'executing'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'executing`
#:
#: This set includes exclusively tasks with :attr:`~TaskState.state` == 'executing'
#: as well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'executing`.
#:
#: See also :meth:`executing_count` and :attr:`long_running`.
executing: set[TaskState]
Expand All @@ -1188,11 +1195,11 @@ class WorkerState:
#: :func:`~distributed.secede`, so they no longer count towards the maximum number
#: of concurrent tasks (nthreads).
#: These tasks do not appear in the :attr:`executing` set.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'long-running'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'long-running`
#:
#: This set includes exclusively tasks with
#: :attr:`~TaskState.state` == 'long-running' as well as tasks with
#: :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'long-running`.
long_running: set[TaskState]

#: A number of tasks that this worker has run in its lifetime; this includes failed
Expand Down
9 changes: 9 additions & 0 deletions docs/source/images/run_dot.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

set -o errexit

for in_fname in *.dot
do
out_fname=${in_fname%.dot}.svg
dot -Tsvg $in_fname > $out_fname
done
7 changes: 2 additions & 5 deletions docs/source/images/task-state.dot
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ digraph{
];
released1 [label=released];
released2 [label=released];
new -> released1;
released1 -> waiting;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, merge conflict with #6614 for this file

waiting -> processing;
waiting -> "no-worker";
"no-worker" -> waiting;
"no-worker" -> processing;
waiting -> "no-worker" [dir=both];
processing -> memory;
processing -> error;
error -> forgotten;
error -> released2;
memory -> released2;
released2 -> forgotten;
}
150 changes: 73 additions & 77 deletions docs/source/images/task-state.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions docs/source/images/worker-cancel-state1.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
digraph{
graph [
bgcolor="#FFFFFFF00",
rankdir=LR,
];

executing1 [label="executing"];
executing2 [label="executing"];
cancelled [label="cancelled(executing)"];
resumed [label="resumed(fetch)"];

executing1 -> cancelled;
cancelled -> released;
cancelled -> executing2;
released -> forgotten;

cancelled -> resumed [dir=both];
resumed -> executing2;
resumed -> memory;
resumed -> fetch;
}
Loading