Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 197 additions & 165 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mock = "^4.0.3"
anyio = "^3.6.1"
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
types-mock = "^4.0.15"
taskiq_dependencies = "~1.0.0"

[tool.poetry.extras]
zmq = ["pyzmq"]
Expand Down
3 changes: 2 additions & 1 deletion taskiq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Distributed task manager."""
from taskiq_dependencies import Depends as TaskiqDepends

from taskiq.abc.broker import AsyncBroker, AsyncTaskiqDecoratedTask
from taskiq.abc.formatter import TaskiqFormatter
from taskiq.abc.middleware import TaskiqMiddleware
Expand All @@ -8,7 +10,6 @@
from taskiq.brokers.shared_broker import async_shared_broker
from taskiq.brokers.zmq_broker import ZeroMQBroker
from taskiq.context import Context
from taskiq.dependencies import TaskiqDepends
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskiqError
from taskiq.funcs import gather
Expand Down
6 changes: 3 additions & 3 deletions taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
class ScheduleSource(ABC):
"""Abstract class for source of scheduled tasks."""

async def startup(self) -> None:
async def startup(self) -> None: # noqa: B027
"""Action to execute during startup."""

async def shutdown(self) -> None:
async def shutdown(self) -> None: # noqa: B027
"""Actions to execute during shutdown."""

@abstractmethod
async def get_schedules(self) -> List["ScheduledTask"]:
"""Get list of taskiq schedules."""

async def add_schedule(self, schedule: "ScheduledTask") -> None:
async def add_schedule(self, schedule: "ScheduledTask") -> None: # noqa: B027
"""
Add a new schedule.

Expand Down
3 changes: 2 additions & 1 deletion taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from collections import OrderedDict
from typing import Any, Callable, Coroutine, Optional, TypeVar, get_type_hints

from taskiq_dependencies import DependencyGraph

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.receiver import Receiver
from taskiq.dependencies import DependencyGraph
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskiqError
from taskiq.message import BrokerMessage
Expand Down
5 changes: 3 additions & 2 deletions taskiq/cli/worker/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from time import time
from typing import Any, Callable, Dict, get_type_hints

from taskiq_dependencies import DependencyGraph

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.log_collector import log_collector
from taskiq.cli.worker.params_parser import parse_params
from taskiq.context import Context
from taskiq.dependencies import DependencyGraph
from taskiq.message import BrokerMessage, TaskiqMessage
from taskiq.result import TaskiqResult
from taskiq.state import TaskiqState
Expand Down Expand Up @@ -164,7 +165,7 @@ async def run_task( # noqa: C901, WPS210
dep_ctx = None
if dependency_graph:
# Create a context for dependency resolving.
dep_ctx = dependency_graph.ctx(
dep_ctx = dependency_graph.async_ctx(
{
Context: Context(message, self.broker),
TaskiqState: self.broker.state,
Expand Down
Loading