diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c5fc2f47..7b944c15 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python: ['3.9', '3.10', '3.11', '3.12'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python }} @@ -26,6 +26,21 @@ jobs: - run: make dev - run: make test + examples: + runs-on: ubuntu-latest + strategy: + matrix: + python: ['3.12'] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + cache: pip + - run: make dev + - run: make exampletest + format: runs-on: ubuntu-latest strategy: diff --git a/Makefile b/Makefile index 334ee978..c025bb0d 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,10 @@ typecheck: $(PYTHON) -m mypy src tests unittest: - $(PYTHON) -m pytest + $(PYTHON) -m pytest tests + +exampletest: + $(PYTHON) -m pytest examples coverage: typecheck coverage run -m unittest discover diff --git a/pyproject.toml b/pyproject.toml index 95ab6ed6..d2bef0cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "dispatch-py" description = "Develop reliable distributed systems on the Dispatch platform." readme = "README.md" dynamic = ["version"] -requires-python = ">= 3.9" +requires-python = ">= 3.8" dependencies = [ "grpcio >= 1.60.0", "protobuf >= 4.24.0", diff --git a/src/dispatch/coroutine.py b/src/dispatch/coroutine.py index 79701a73..ef54d465 100644 --- a/src/dispatch/coroutine.py +++ b/src/dispatch/coroutine.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from types import coroutine -from typing import Any, Awaitable +from typing import Any, Awaitable, List, Tuple from dispatch.experimental.durable import durable from dispatch.proto import Call @@ -16,14 +16,14 @@ def call(call: Call) -> Any: @coroutine @durable -def gather(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] +def gather(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc] """Alias for all.""" return all(*awaitables) @coroutine @durable -def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] +def all(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc] """Concurrently run a set of coroutines, blocking until all coroutines return or any coroutine raises an error. If any coroutine fails with an uncaught exception, the exception will be re-raised here.""" @@ -32,7 +32,7 @@ def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] @coroutine @durable -def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] +def any(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc] """Concurrently run a set of coroutines, blocking until any coroutine returns or all coroutines raises an error. If all coroutines fail with uncaught exceptions, the exception(s) will be re-raised here.""" @@ -41,7 +41,7 @@ def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] @coroutine @durable -def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] +def race(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc] """Concurrently run a set of coroutines, blocking until any coroutine returns or raises an error. If any coroutine fails with an uncaught exception, the exception will be re-raised here.""" @@ -50,17 +50,17 @@ def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc] @dataclass class AllDirective: - awaitables: tuple[Awaitable[Any], ...] + awaitables: Tuple[Awaitable[Any], ...] @dataclass class AnyDirective: - awaitables: tuple[Awaitable[Any], ...] + awaitables: Tuple[Awaitable[Any], ...] @dataclass class RaceDirective: - awaitables: tuple[Awaitable[Any], ...] + awaitables: Tuple[Awaitable[Any], ...] class AnyException(RuntimeError): @@ -69,7 +69,7 @@ class AnyException(RuntimeError): __slots__ = ("exceptions",) - def __init__(self, exceptions: list[Exception]): + def __init__(self, exceptions: List[Exception]): self.exceptions = exceptions def __str__(self): diff --git a/src/dispatch/experimental/durable/frame.c b/src/dispatch/experimental/durable/frame.c index b3bb4517..8dcfec96 100644 --- a/src/dispatch/experimental/durable/frame.c +++ b/src/dispatch/experimental/durable/frame.c @@ -6,11 +6,12 @@ #define PY_SSIZE_T_CLEAN #include -#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 9 || PY_MINOR_VERSION > 13) -# error Python 3.9-3.13 is required +#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 8 || PY_MINOR_VERSION > 13) +# error Python 3.8-3.13 is required #endif // This is a redefinition of the private PyTryBlock from <= 3.10. +// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L10 // https://github.com/python/cpython/blob/3.9/Include/cpython/frameobject.h#L11 // https://github.com/python/cpython/blob/3.10/Include/cpython/frameobject.h#L22 typedef struct { @@ -19,7 +20,8 @@ typedef struct { int b_level; } PyTryBlock; -// This is a redefinition of the private PyCoroWrapper from 3.9-3.13. +// This is a redefinition of the private PyCoroWrapper from 3.8-3.13. +// https://github.com/python/cpython/blob/3.8/Objects/genobject.c#L840 // https://github.com/python/cpython/blob/3.9/Objects/genobject.c#L830 // https://github.com/python/cpython/blob/3.10/Objects/genobject.c#L884 // https://github.com/python/cpython/blob/3.11/Objects/genobject.c#L1016 @@ -53,7 +55,9 @@ static int get_frame_iblock(Frame *frame); static void set_frame_iblock(Frame *frame, int iblock); static PyTryBlock *get_frame_blockstack(Frame *frame); -#if PY_MINOR_VERSION == 9 +#if PY_MINOR_VERSION == 8 +#include "frame308.h" +#elif PY_MINOR_VERSION == 9 #include "frame309.h" #elif PY_MINOR_VERSION == 10 #include "frame310.h" diff --git a/src/dispatch/experimental/durable/frame308.h b/src/dispatch/experimental/durable/frame308.h new file mode 100644 index 00000000..615449b6 --- /dev/null +++ b/src/dispatch/experimental/durable/frame308.h @@ -0,0 +1,145 @@ +// This is a redefinition of the private/opaque frame object. +// +// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L16 +// +// In Python <= 3.10, `struct _frame` is both the PyFrameObject and +// PyInterpreterFrame. From Python 3.11 onwards, the two were split with the +// PyFrameObject (struct _frame) pointing to struct _PyInterpreterFrame. +struct Frame { + PyObject_VAR_HEAD + struct Frame *f_back; // struct _frame + PyCodeObject *f_code; + PyObject *f_builtins; + PyObject *f_globals; + PyObject *f_locals; + PyObject **f_valuestack; + PyObject **f_stacktop; + PyObject *f_trace; + char f_trace_lines; + char f_trace_opcodes; + PyObject *f_gen; + int f_lasti; + int f_lineno; + int f_iblock; + char f_executing; + PyTryBlock f_blockstack[CO_MAXBLOCKS]; + PyObject *f_localsplus[1]; +}; + +// Python 3.9 and prior didn't have an explicit enum of frame states, +// but we can derive them based on the presence of a frame, and other +// information found on the frame, for compatibility with later versions. +typedef enum _framestate { + FRAME_CREATED = -2, + FRAME_EXECUTING = 0, + FRAME_CLEARED = 4 +} FrameState; + +/* +// This is the definition of PyGenObject for reference to developers +// working on the extension. +// +// Note that PyCoroObject and PyAsyncGenObject have the same layout as +// PyGenObject, however the struct fields have a cr_ and ag_ prefix +// (respectively) rather than a gi_ prefix. In Python <= 3.10, PyCoroObject +// and PyAsyncGenObject have extra fields compared to PyGenObject. In Python +// 3.11 onwards, the three objects are identical (except for field name +// prefixes). The extra fields in Python <= 3.10 are not applicable to the +// extension at this time. +// +// https://github.com/python/cpython/blob/3.8/Include/genobject.h#L17 +typedef struct { + PyObject_HEAD + PyFrameObject *gi_frame; + char gi_running; + PyObject *gi_code; + PyObject *gi_weakreflist; + PyObject *gi_name; + PyObject *gi_qualname; + _PyErr_StackItem gi_exc_state; +} PyGenObject; +*/ + +static Frame *get_frame(PyGenObject *gen_like) { + Frame *frame = (Frame *)(gen_like->gi_frame); + assert(frame); + return frame; +} + +static PyCodeObject *get_frame_code(Frame *frame) { + PyCodeObject *code = frame->f_code; + assert(code); + return code; +} + +static int get_frame_lasti(Frame *frame) { + return frame->f_lasti; +} + +static void set_frame_lasti(Frame *frame, int lasti) { + frame->f_lasti = lasti; +} + +static int get_frame_state(PyGenObject *gen_like) { + // Python 3.8 doesn't have frame states, but we can derive + // some for compatibility with later versions and to simplify + // the extension. + Frame *frame = (Frame *)(gen_like->gi_frame); + if (!frame) { + return FRAME_CLEARED; + } + return frame->f_executing ? FRAME_EXECUTING : FRAME_CREATED; +} + +static void set_frame_state(PyGenObject *gen_like, int fs) { + Frame *frame = get_frame(gen_like); + frame->f_executing = (fs == FRAME_EXECUTING); +} + +static int valid_frame_state(int fs) { + return fs == FRAME_CREATED || fs == FRAME_EXECUTING || fs == FRAME_CLEARED; +} + +static int get_frame_stacktop_limit(Frame *frame) { + PyCodeObject *code = get_frame_code(frame); + return code->co_stacksize + code->co_nlocals; +} + +static int get_frame_stacktop(Frame *frame) { + assert(frame->f_localsplus); + int stacktop = (int)(frame->f_stacktop - frame->f_localsplus); + assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame)); + return stacktop; +} + +static void set_frame_stacktop(Frame *frame, int stacktop) { + assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame)); + assert(frame->f_localsplus); + frame->f_stacktop = frame->f_localsplus + stacktop; +} + +static PyObject **get_frame_localsplus(Frame *frame) { + PyObject **localsplus = frame->f_localsplus; + assert(localsplus); + return localsplus; +} + +static int get_frame_iblock_limit(Frame *frame) { + return CO_MAXBLOCKS; +} + +static int get_frame_iblock(Frame *frame) { + return frame->f_iblock; +} + +static void set_frame_iblock(Frame *frame, int iblock) { + assert(iblock >= 0 && iblock < get_frame_iblock_limit(frame)); + frame->f_iblock = iblock; +} + +static PyTryBlock *get_frame_blockstack(Frame *frame) { + PyTryBlock *blockstack = frame->f_blockstack; + assert(blockstack); + return blockstack; +} + diff --git a/src/dispatch/experimental/durable/function.py b/src/dispatch/experimental/durable/function.py index 925b0ee5..2acfbff5 100644 --- a/src/dispatch/experimental/durable/function.py +++ b/src/dispatch/experimental/durable/function.py @@ -9,7 +9,18 @@ MethodType, TracebackType, ) -from typing import Any, Callable, Coroutine, Generator, Optional, TypeVar, Union, cast +from typing import ( + Any, + Callable, + Coroutine, + Dict, + Generator, + Optional, + Tuple, + TypeVar, + Union, + cast, +) from . import frame as ext from .registry import RegisteredFunction, lookup_function, register_function @@ -78,8 +89,8 @@ class Serializable: g: Union[GeneratorType, CoroutineType] registered_fn: RegisteredFunction wrapped_coroutine: Union["DurableCoroutine", None] - args: tuple[Any, ...] - kwargs: dict[str, Any] + args: Tuple[Any, ...] + kwargs: Dict[str, Any] def __init__( self, @@ -274,7 +285,7 @@ def cr_await(self) -> Any: return self.coroutine.cr_await @property - def cr_origin(self) -> Optional[tuple[tuple[str, int, str], ...]]: + def cr_origin(self) -> Optional[Tuple[Tuple[str, int, str], ...]]: return self.coroutine.cr_origin def __repr__(self) -> str: diff --git a/src/dispatch/experimental/durable/registry.py b/src/dispatch/experimental/durable/registry.py index da8f2c28..fa6f2228 100644 --- a/src/dispatch/experimental/durable/registry.py +++ b/src/dispatch/experimental/durable/registry.py @@ -1,6 +1,7 @@ import hashlib from dataclasses import dataclass from types import FunctionType +from typing import Dict @dataclass @@ -46,7 +47,7 @@ def __setstate__(self, state): self.hash = code_hash -_REGISTRY: dict[str, RegisteredFunction] = {} +_REGISTRY: Dict[str, RegisteredFunction] = {} def register_function(fn: FunctionType) -> RegisteredFunction: diff --git a/src/dispatch/function.py b/src/dispatch/function.py index 0caeb087..f401f950 100644 --- a/src/dispatch/function.py +++ b/src/dispatch/function.py @@ -12,6 +12,7 @@ Dict, Generic, Iterable, + List, Optional, TypeVar, overload, @@ -329,7 +330,7 @@ def batch(self) -> Batch: a set of calls to dispatch.""" return Batch(self) - def dispatch(self, calls: Iterable[Call]) -> list[DispatchID]: + def dispatch(self, calls: Iterable[Call]) -> List[DispatchID]: """Dispatch function calls. Args: @@ -369,7 +370,7 @@ class Batch: def __init__(self, client: Client): self.client = client - self.calls: list[Call] = [] + self.calls: List[Call] = [] def add(self, func: Function[P, T], *args: P.args, **kwargs: P.kwargs) -> Batch: """Add a call to the specified function to the batch.""" @@ -380,7 +381,7 @@ def add_call(self, call: Call) -> Batch: self.calls.append(call) return self - def dispatch(self) -> list[DispatchID]: + def dispatch(self) -> List[DispatchID]: """Dispatch dispatches the calls asynchronously. The batch is reset when the calls are dispatched successfully. diff --git a/src/dispatch/proto.py b/src/dispatch/proto.py index ef70e84c..1a9d5c5e 100644 --- a/src/dispatch/proto.py +++ b/src/dispatch/proto.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from traceback import format_exception from types import TracebackType -from typing import Any, Optional +from typing import Any, Dict, List, Optional, Tuple import google.protobuf.any_pb2 import google.protobuf.message @@ -79,7 +79,7 @@ def input(self) -> Any: self._assert_first_call() return self._input - def input_arguments(self) -> tuple[tuple[Any, ...], dict[str, Any]]: + def input_arguments(self) -> Tuple[Tuple[Any, ...], Dict[str, Any]]: """Returns positional and keyword arguments carried by the input.""" self._assert_first_call() if not isinstance(self._input, Arguments): @@ -92,7 +92,7 @@ def coroutine_state(self) -> Any: return self._coroutine_state @property - def call_results(self) -> list[CallResult]: + def call_results(self) -> List[CallResult]: self._assert_resume() return self._call_results @@ -124,7 +124,7 @@ def from_poll_results( cls, function: str, coroutine_state: Any, - call_results: list[CallResult], + call_results: List[CallResult], error: Optional[Error] = None, ): return Input( @@ -143,8 +143,8 @@ def from_poll_results( class Arguments: """A container for positional and keyword arguments.""" - args: tuple[Any, ...] - kwargs: dict[str, Any] + args: Tuple[Any, ...] + kwargs: Dict[str, Any] @dataclass @@ -201,7 +201,7 @@ def exit( def poll( cls, state: Any, - calls: Optional[list[Call]] = None, + calls: Optional[List[Call]] = None, min_results: int = 1, max_results: int = 10, max_wait_seconds: Optional[int] = None, diff --git a/src/dispatch/scheduler.py b/src/dispatch/scheduler.py index 42915450..bd69575a 100644 --- a/src/dispatch/scheduler.py +++ b/src/dispatch/scheduler.py @@ -2,7 +2,18 @@ import pickle import sys from dataclasses import dataclass, field -from typing import Any, Awaitable, Callable, Optional, Protocol, Union +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Protocol, + Set, + Tuple, + Union, +) from typing_extensions import TypeAlias @@ -84,9 +95,9 @@ def value(self) -> Any: class AllFuture: """A future result of a dispatch.coroutine.all() operation.""" - order: list[CoroutineID] = field(default_factory=list) - waiting: set[CoroutineID] = field(default_factory=set) - results: dict[CoroutineID, CoroutineResult] = field(default_factory=dict) + order: List[CoroutineID] = field(default_factory=list) + waiting: Set[CoroutineID] = field(default_factory=set) + results: Dict[CoroutineID, CoroutineResult] = field(default_factory=dict) first_error: Optional[Exception] = None def add_result(self, result: Union[CallResult, CoroutineResult]): @@ -115,7 +126,7 @@ def error(self) -> Optional[Exception]: assert self.ready() return self.first_error - def value(self) -> list[Any]: + def value(self) -> List[Any]: assert self.ready() assert len(self.waiting) == 0 assert self.first_error is None @@ -126,10 +137,10 @@ def value(self) -> list[Any]: class AnyFuture: """A future result of a dispatch.coroutine.any() operation.""" - order: list[CoroutineID] = field(default_factory=list) - waiting: set[CoroutineID] = field(default_factory=set) + order: List[CoroutineID] = field(default_factory=list) + waiting: Set[CoroutineID] = field(default_factory=set) first_result: Optional[CoroutineResult] = None - errors: dict[CoroutineID, Exception] = field(default_factory=dict) + errors: Dict[CoroutineID, Exception] = field(default_factory=dict) generic_error: Optional[Exception] = None def add_result(self, result: Union[CallResult, CoroutineResult]): @@ -183,7 +194,7 @@ def value(self) -> Any: class RaceFuture: """A future result of a dispatch.coroutine.race() operation.""" - waiting: set[CoroutineID] = field(default_factory=set) + waiting: Set[CoroutineID] = field(default_factory=set) first_result: Optional[CoroutineResult] = None first_error: Optional[Exception] = None @@ -248,12 +259,12 @@ class State: """State of the scheduler and the coroutines it's managing.""" version: str - suspended: dict[CoroutineID, Coroutine] - ready: list[Coroutine] + suspended: Dict[CoroutineID, Coroutine] + ready: List[Coroutine] next_coroutine_id: int next_call_id: int - prev_callers: list[Coroutine] + prev_callers: List[Coroutine] outstanding_calls: int @@ -416,7 +427,7 @@ def _run(self, input: Input) -> Output: len(state.ready) + len(state.suspended), ) - pending_calls: list[Call] = [] + pending_calls: List[Call] = [] while state.ready: coroutine = state.ready.pop(0) logger.debug("running %s", coroutine) @@ -542,8 +553,8 @@ def _run(self, input: Input) -> Output: def spawn_children( - state: State, coroutine: Coroutine, awaitables: tuple[Awaitable[Any], ...] -) -> list[Coroutine]: + state: State, coroutine: Coroutine, awaitables: Tuple[Awaitable[Any], ...] +) -> List[Coroutine]: children = [] for awaitable in awaitables: g = awaitable.__await__() diff --git a/src/dispatch/signature/__init__.py b/src/dispatch/signature/__init__.py index 5794927b..a36173b8 100644 --- a/src/dispatch/signature/__init__.py +++ b/src/dispatch/signature/__init__.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timedelta -from typing import Sequence, cast +from typing import Sequence, Set, cast import http_sfv from cryptography.hazmat.primitives.asymmetric.ed25519 import ( @@ -114,8 +114,8 @@ def verify_request(request: Request, key: Ed25519PublicKey, max_age: timedelta): verify_content_digest(request.headers["Content-Digest"], request.body) -def extract_covered_components(result: VerifyResult) -> set[str]: - covered_components: set[str] = set() +def extract_covered_components(result: VerifyResult) -> Set[str]: + covered_components: Set[str] = set() for key in result.covered_components.keys(): item = http_sfv.Item() item.parse(key.encode()) diff --git a/src/dispatch/status.py b/src/dispatch/status.py index 1a8f34d2..8ee431ca 100644 --- a/src/dispatch/status.py +++ b/src/dispatch/status.py @@ -1,5 +1,5 @@ import enum -from typing import Any, Callable, Type +from typing import Any, Callable, Dict, Type from dispatch.error import IncompatibleStateError from dispatch.sdk.v1 import status_pb2 as status_pb @@ -79,8 +79,8 @@ def __str__(self): Status.NOT_FOUND.__doc__ = "An operation was performed on a non-existent resource" Status.NOT_FOUND._proto = status_pb.STATUS_NOT_FOUND -_ERROR_TYPES: dict[Type[Exception], Callable[[Exception], Status]] = {} -_OUTPUT_TYPES: dict[Type[Any], Callable[[Any], Status]] = {} +_ERROR_TYPES: Dict[Type[Exception], Callable[[Exception], Status]] = {} +_OUTPUT_TYPES: Dict[Type[Any], Callable[[Any], Status]] = {} def status_for_error(error: Exception) -> Status: diff --git a/src/dispatch/test/server.py b/src/dispatch/test/server.py index b6d9c11a..a2d022b8 100644 --- a/src/dispatch/test/server.py +++ b/src/dispatch/test/server.py @@ -1,4 +1,5 @@ import concurrent.futures +import sys import grpc @@ -47,7 +48,10 @@ def stop(self): """Stop the server.""" self._server.stop(0) self._server.wait_for_termination() - self._thread_pool.shutdown(wait=True, cancel_futures=True) + if sys.version_info >= (3, 9): + self._thread_pool.shutdown(wait=True, cancel_futures=True) + else: + self._thread_pool.shutdown(wait=True) def __enter__(self): self.start() diff --git a/src/dispatch/test/service.py b/src/dispatch/test/service.py index d711a4ed..6129f4ef 100644 --- a/src/dispatch/test/service.py +++ b/src/dispatch/test/service.py @@ -5,7 +5,7 @@ import time from collections import OrderedDict from dataclasses import dataclass -from typing import Optional +from typing import Dict, List, Optional, Set, Tuple import grpc import httpx @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) -RoundTrip: TypeAlias = tuple[function_pb.RunRequest, function_pb.RunResponse] +RoundTrip: TypeAlias = Tuple[function_pb.RunRequest, function_pb.RunResponse] """A request to a Dispatch endpoint, and the response that was received.""" @@ -54,7 +54,7 @@ def __init__( self, endpoint_client: EndpointClient, api_key: Optional[str] = None, - retry_on_status: Optional[set[Status]] = None, + retry_on_status: Optional[Set[Status]] = None, collect_roundtrips: bool = False, ): """Initialize the Dispatch service. @@ -82,12 +82,12 @@ def __init__( self._next_dispatch_id = 1 - self.queue: list[tuple[DispatchID, function_pb.RunRequest, CallType]] = [] + self.queue: List[Tuple[DispatchID, function_pb.RunRequest, CallType]] = [] - self.pollers: dict[DispatchID, Poller] = {} - self.parents: dict[DispatchID, Poller] = {} + self.pollers: Dict[DispatchID, Poller] = {} + self.parents: Dict[DispatchID, Poller] = {} - self.roundtrips: Optional[OrderedDict[DispatchID, list[RoundTrip]]] = None + self.roundtrips: Optional[OrderedDict[DispatchID, List[RoundTrip]]] = None if collect_roundtrips: self.roundtrips = OrderedDict() @@ -354,5 +354,5 @@ class Poller: coroutine_state: bytes # TODO: support max_wait/min_results/max_results - waiting: dict[DispatchID, call_pb.Call] - results: dict[DispatchID, call_pb.CallResult] + waiting: Dict[DispatchID, call_pb.Call] + results: Dict[DispatchID, call_pb.CallResult] diff --git a/tests/dispatch/test_scheduler.py b/tests/dispatch/test_scheduler.py index c5189de2..2576da97 100644 --- a/tests/dispatch/test_scheduler.py +++ b/tests/dispatch/test_scheduler.py @@ -1,5 +1,5 @@ import unittest -from typing import Any, Callable, Optional +from typing import Any, Callable, List, Optional, Type from dispatch.coroutine import AnyException, any, call, gather, race from dispatch.experimental.durable import durable @@ -413,7 +413,7 @@ def resume( self, main: Callable, prev_output: Output, - call_results: list[CallResult], + call_results: List[CallResult], poll_error: Optional[Exception] = None, ): poll = self.assert_poll(prev_output) @@ -444,7 +444,7 @@ def assert_exit_result_value(self, output: Output, expect: Any): self.assertEqual(expect, any_unpickle(result.output)) def assert_exit_result_error( - self, output: Output, expect: type[Exception], message: Optional[str] = None + self, output: Output, expect: Type[Exception], message: Optional[str] = None ): result = self.assert_exit_result(output) self.assertFalse(result.HasField("output")) @@ -471,7 +471,7 @@ def assert_empty_poll(self, output: Output): self.assertEqual(len(poll.calls), 0) def assert_poll_call_functions( - self, output: Output, expect: list[str], min_results=None, max_results=None + self, output: Output, expect: List[str], min_results=None, max_results=None ): poll = self.assert_poll(output) # Note: we're not testing endpoint/input here. diff --git a/tests/examples b/tests/examples deleted file mode 120000 index a6573af9..00000000 --- a/tests/examples +++ /dev/null @@ -1 +0,0 @@ -../examples \ No newline at end of file