Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/examples/introduction/aio_pika_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def main() -> None:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions docs/examples/introduction/full_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def main() -> None:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions docs/examples/introduction/inmemory_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async def add_one(value: int) -> int:


async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
Expand All @@ -21,6 +22,7 @@ async def main() -> None:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
Expand Down
66 changes: 66 additions & 0 deletions docs/examples/state/events_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
from typing import Optional

from redis.asyncio import ConnectionPool, Redis # type: ignore
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend

from taskiq import Context, TaskiqEvents, TaskiqState
from taskiq.context import default_context

# To run this example, please install:
# * taskiq
# * taskiq-redis
# * taskiq-aio-pika

broker = AioPikaBroker(
"amqp://localhost",
result_backend=RedisAsyncResultBackend(
"redis://localhost/0",
),
)


@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
# Here we store connection pool on startup for later use.
state.redis = ConnectionPool.from_url("redis://localhost/1")


@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def shutdown(state: TaskiqState) -> None:
# Here we close our pool on shutdown event.
await state.redis.disconnect()


@broker.task
async def get_val(key: str, context: Context = default_context) -> Optional[str]:
# Now we can use our pool.
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
return await redis.get(key)


@broker.task
async def set_val(key: str, value: str, context: Context = default_context) -> None:
# Now we can use our pool to set value.
await Redis(connection_pool=context.state.redis).set(key, value)


async def main() -> None:
await broker.startup()

set_task = await set_val.kiq("key", "value")
set_result = await set_task.wait_result(with_logs=True)
if set_result.is_err:
print(set_result.log)
raise ValueError("Cannot set value in redis. See logs.")

get_task = await get_val.kiq("key")
get_res = await get_task.wait_result()
print(f"Got redis value: {get_res.return_value}")

await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
8 changes: 7 additions & 1 deletion docs/guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ from taskiq import InMemoryBroker
broker = InMemoryBroker()
```

And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further.
And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. Also, we call the `startup` method at the beginning of the `main` function.

@[code python](../examples/introduction/inmemory_run.py)

::: tip Cool tip!

Calling the `startup` method is not required, but we strongly recommend you do so.

:::

If you run this code, you will get this in your terminal:

```bash:no-line-numbers
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: 7
order: 8
---

# Scheduling tasks
Expand Down
32 changes: 32 additions & 0 deletions docs/guide/state-and-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
order: 7
---

# State and events

The `TaskiqState` is a global variable where you can keep the variables you want to use later.
For example, you want to open a database connection pool at a broker's startup.

This can be acieved by adding event handlers.

You can use one of these events:
* `WORKER_STARTUP`
* `CLIENT_STARTUP`
* `WORKER_SHUTDOWN`
* `CLIENT_SHUTDOWN`

Worker events are called when you start listening to the broker messages using taskiq.
Client events are called when you call the `startup` method of your broker from your code.

This is an example of code using event handlers:

@[code python](../examples/state/events_example.py)

::: tip Cool tip!

If you want to add handlers programmatically, you can use the `broker.add_event_handler` function.

:::

As you can see in this example, this worker will initialize the Redis pool at the startup.
You can access the state from the context.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ authors = ["Pavel Kirilin <win10@list.ru>"]
maintainers = ["Pavel Kirilin <win10@list.ru>"]
readme = "README.md"
repository = "https://github.com/taskiq-python/taskiq"
homepage = "https://taskiq-python.github.io/"
documentation = "https://taskiq-python.github.io/"
license = "LICENSE"
classifiers = [
"Typing :: Typed",
Expand All @@ -21,7 +23,6 @@ classifiers = [
"Topic :: System :: Networking",
"Development Status :: 3 - Alpha",
]
homepage = "https://github.com/taskiq-python/taskiq"
keywords = ["taskiq", "tasks", "distributed", "async"]

[tool.poetry.dependencies]
Expand Down
4 changes: 4 additions & 0 deletions taskiq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@
from taskiq.brokers.shared_broker import async_shared_broker
from taskiq.brokers.zmq_broker import ZeroMQBroker
from taskiq.context import Context
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskiqError
from taskiq.funcs import gather
from taskiq.message import BrokerMessage, TaskiqMessage
from taskiq.result import TaskiqResult
from taskiq.scheduler import ScheduledTask, TaskiqScheduler
from taskiq.state import TaskiqState
from taskiq.task import AsyncTaskiqTask

__all__ = [
"gather",
"Context",
"AsyncBroker",
"TaskiqError",
"TaskiqState",
"TaskiqResult",
"ZeroMQBroker",
"TaskiqEvents",
"TaskiqMessage",
"BrokerMessage",
"InMemoryBroker",
Expand Down
93 changes: 81 additions & 12 deletions taskiq/abc/broker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import inspect
import os
import sys
from abc import ABC, abstractmethod
from collections import defaultdict
from functools import wraps
from logging import getLogger
from typing import ( # noqa: WPS235
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Coroutine,
DefaultDict,
Dict,
List,
Optional,
Expand All @@ -18,22 +20,27 @@
)
from uuid import uuid4

from typing_extensions import ParamSpec
from typing_extensions import ParamSpec, TypeAlias

from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.events import TaskiqEvents
from taskiq.formatters.json_formatter import JSONFormatter
from taskiq.message import BrokerMessage
from taskiq.result_backends.dummy import DummyResultBackend
from taskiq.state import TaskiqState
from taskiq.utils import maybe_awaitable

if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from taskiq.abc.formatter import TaskiqFormatter
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.abc.result_backend import AsyncResultBackend

_T = TypeVar("_T") # noqa: WPS111
_FuncParams = ParamSpec("_FuncParams")
_ReturnType = TypeVar("_ReturnType")

EventHandler: TypeAlias = Callable[[TaskiqState], Optional[Awaitable[None]]]

logger = getLogger("taskiq")


Expand All @@ -49,7 +56,7 @@ def default_id_generator() -> str:
return uuid4().hex


class AsyncBroker(ABC):
class AsyncBroker(ABC): # noqa: WPS230
"""
Async broker.

Expand All @@ -75,8 +82,16 @@ def __init__(
self.decorator_class = AsyncTaskiqDecoratedTask
self.formatter: "TaskiqFormatter" = JSONFormatter()
self.id_generator = task_id_generator

def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
# Every event has a list of handlers.
# Every handler is a function which takes state as a first argument.
# And handler can be either sync or async.
self.event_handlers: DefaultDict[ # noqa: WPS234
TaskiqEvents,
List[Callable[[TaskiqState], Optional[Awaitable[None]]]],
] = defaultdict(list)
self.state = TaskiqState()

def add_middlewares(self, *middlewares: "TaskiqMiddleware") -> None:
"""
Add a list of middlewares.

Expand All @@ -86,11 +101,23 @@ def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
:param middlewares: list of middlewares.
"""
for middleware in middlewares:
if not isinstance(middleware, TaskiqMiddleware):
logger.warning(
f"Middleware {middleware} is not an instance of TaskiqMiddleware. "
"Skipping...",
)
continue
middleware.set_broker(self)
self.middlewares.append(middleware)

async def startup(self) -> None:
"""Do something when starting broker."""
event = TaskiqEvents.CLIENT_STARTUP
if self.is_worker_process:
event = TaskiqEvents.WORKER_STARTUP

for handler in self.event_handlers[event]:
await maybe_awaitable(handler(self.state))

async def shutdown(self) -> None:
"""
Expand All @@ -99,11 +126,13 @@ async def shutdown(self) -> None:
This method is called,
when broker is closig.
"""
for middleware in self.middlewares:
middleware_shutdown = middleware.shutdown()
if inspect.isawaitable(middleware_shutdown):
await middleware_shutdown
await self.result_backend.shutdown()
event = TaskiqEvents.CLIENT_SHUTDOWN
if self.is_worker_process:
event = TaskiqEvents.WORKER_SHUTDOWN

# Call all shutdown events.
for handler in self.event_handlers[event]:
await maybe_awaitable(handler(self.state))

@abstractmethod
async def kick(
Expand Down Expand Up @@ -232,3 +261,43 @@ def inner(
inner_task_name=task_name,
inner_labels=labels or {},
)

def on_event(self, *events: TaskiqEvents) -> Callable[[EventHandler], EventHandler]:
"""
Adds event handler.

This function adds function to call when event occurs.

:param events: events to react to.
:return: a decorator function.
"""

def handler(function: EventHandler) -> EventHandler:
for event in events:
self.event_handlers[event].append(function)
return function

return handler

def add_event_handler(
self,
event: TaskiqEvents,
handler: EventHandler,
) -> None:
"""
Adds event handler.

this function is the same as on_event.

>>> broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, my_startup)

if similar to:

>>> @broker.on_event(TaskiqEvents.WORKER_STARTUP)
>>> async def my_startup(context: Context) -> None:
>>> ...

:param event: Event to react to.
:param handler: handler to call when event is started.
"""
self.event_handlers[event].append(handler)
5 changes: 1 addition & 4 deletions taskiq/abc/middleware.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Any, Coroutine, Union

if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING: # pragma: no cover # pragma: no cover
from taskiq.abc.broker import AsyncBroker
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult
Expand All @@ -20,9 +20,6 @@ def set_broker(self, broker: "AsyncBroker") -> None:
"""
self.broker = broker

def shutdown(self) -> Union[None, Coroutine[Any, Any, None]]:
"""This function is used to do some work on shutdown."""

def pre_send(
self,
message: "TaskiqMessage",
Expand Down
2 changes: 1 addition & 1 deletion taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from taskiq.scheduler.scheduler import ScheduledTask


Expand Down
Loading