diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 2848f4cd41b..da4fc206cf7 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -43,34 +43,32 @@ logger = logging.getLogger("distributed.worker.state_machine") if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias + # TODO import from typing (TypeAlias requires Python >=3.10) + # TODO import from typing (NotRequired requires Python >=3.11) + from typing_extensions import NotRequired, TypeAlias # Circular imports from distributed.diagnostics.plugin import WorkerPlugin from distributed.worker import Worker - # TODO move out of TYPE_CHECKING (requires Python >=3.10) - # Not to be confused with distributed.scheduler.TaskStateState - TaskStateState: TypeAlias = Literal[ - "cancelled", - "constrained", - "error", - "executing", - "fetch", - "flight", - "forgotten", - "long-running", - "memory", - "missing", - "ready", - "released", - "rescheduled", - "resumed", - "waiting", - ] -else: - TaskStateState = str +# Not to be confused with distributed.scheduler.TaskStateState +TaskStateState: TypeAlias = Literal[ + "cancelled", + "constrained", + "error", + "executing", + "fetch", + "flight", + "forgotten", + "long-running", + "memory", + "missing", + "ready", + "released", + "rescheduled", + "resumed", + "waiting", +] # TaskState.state subsets PROCESSING: set[TaskStateState] = { @@ -112,14 +110,11 @@ class SerializedTask(NamedTuple): task: object = NO_VALUE -class _StartStopRequired(TypedDict): +class StartStop(TypedDict): action: Literal["compute", "transfer", "disk-read", "disk-write", "deserialize"] start: float stop: float - - -class StartStop(_StartStopRequired, total=False): - source: str + source: NotRequired[str] class InvalidTransition(Exception): @@ -1029,18 +1024,12 @@ class SecedeEvent(StateMachineEvent): compute_duration: float -if TYPE_CHECKING: - # TODO remove quotes (requires Python >=3.9) - # TODO get out of TYPE_CHECKING (requires Python >=3.10) - # {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)} - # Not to be confused with distributed.scheduler.Recs - Recs: TypeAlias = "dict[TaskState, TaskStateState | tuple]" - Instructions: TypeAlias = "list[Instruction]" - RecsInstrs: TypeAlias = "tuple[Recs, Instructions]" -else: - Recs = dict - Instructions = list - RecsInstrs = tuple +# TODO remove quotes (requires Python >=3.9) +# {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)} +# Not to be confused with distributed.scheduler.Recs +Recs: TypeAlias = "dict[TaskState, TaskStateState | tuple]" +Instructions: TypeAlias = "list[Instruction]" +RecsInstrs: TypeAlias = "tuple[Recs, Instructions]" def merge_recs_instructions(*args: RecsInstrs) -> RecsInstrs: