Skip to content

Commit

Permalink
cleanup if TYPE_CHECKING in worker_state_machine
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Jan 5, 2023
1 parent 1f49e88 commit 74a9765
Showing 1 changed file with 29 additions and 40 deletions.
69 changes: 29 additions & 40 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,32 @@
logger = logging.getLogger("distributed.worker.state_machine")

if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.10)
from typing_extensions import TypeAlias
# TODO import from typing (TypeAlias requires Python >=3.10)
# TODO import from typing (NotRequired requires Python >=3.11)
from typing_extensions import NotRequired, TypeAlias

# Circular imports
from distributed.diagnostics.plugin import WorkerPlugin
from distributed.worker import Worker

# TODO move out of TYPE_CHECKING (requires Python >=3.10)
# Not to be confused with distributed.scheduler.TaskStateState
TaskStateState: TypeAlias = Literal[
"cancelled",
"constrained",
"error",
"executing",
"fetch",
"flight",
"forgotten",
"long-running",
"memory",
"missing",
"ready",
"released",
"rescheduled",
"resumed",
"waiting",
]
else:
TaskStateState = str
# Not to be confused with distributed.scheduler.TaskStateState
TaskStateState: TypeAlias = Literal[
"cancelled",
"constrained",
"error",
"executing",
"fetch",
"flight",
"forgotten",
"long-running",
"memory",
"missing",
"ready",
"released",
"rescheduled",
"resumed",
"waiting",
]

# TaskState.state subsets
PROCESSING: set[TaskStateState] = {
Expand Down Expand Up @@ -112,14 +110,11 @@ class SerializedTask(NamedTuple):
task: object = NO_VALUE


class _StartStopRequired(TypedDict):
class StartStop(TypedDict):
action: Literal["compute", "transfer", "disk-read", "disk-write", "deserialize"]
start: float
stop: float


class StartStop(_StartStopRequired, total=False):
source: str
source: NotRequired[str]


class InvalidTransition(Exception):
Expand Down Expand Up @@ -1029,18 +1024,12 @@ class SecedeEvent(StateMachineEvent):
compute_duration: float


if TYPE_CHECKING:
# TODO remove quotes (requires Python >=3.9)
# TODO get out of TYPE_CHECKING (requires Python >=3.10)
# {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)}
# Not to be confused with distributed.scheduler.Recs
Recs: TypeAlias = "dict[TaskState, TaskStateState | tuple]"
Instructions: TypeAlias = "list[Instruction]"
RecsInstrs: TypeAlias = "tuple[Recs, Instructions]"
else:
Recs = dict
Instructions = list
RecsInstrs = tuple
# TODO remove quotes (requires Python >=3.9)
# {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)}
# Not to be confused with distributed.scheduler.Recs
Recs: TypeAlias = "dict[TaskState, TaskStateState | tuple]"
Instructions: TypeAlias = "list[Instruction]"
RecsInstrs: TypeAlias = "tuple[Recs, Instructions]"


def merge_recs_instructions(*args: RecsInstrs) -> RecsInstrs:
Expand Down

0 comments on commit 74a9765

Please sign in to comment.