Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ ignore =
WPS229,
; Found function with too much cognitive complexity
WPS231,
; Found too deep nesting
WPS220,
; Found line with high Jones Complexity
WPS221,
; function name should be lowercase
N802,
; Do not perform function calls in argument defaults.
B008,

; all init files
__init__.py:
Expand All @@ -99,6 +107,10 @@ per-file-ignores =
WPS432,
; Missing parameter(s) in Docstring
DAR101,
; Found too short name
WPS111,
; Found complex default value
WPS404,

exclude =
./.git,
Expand Down
18 changes: 18 additions & 0 deletions docs/examples/state/async_generator_deps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio
from typing import AsyncGenerator

from taskiq import TaskiqDepends


async def dependency() -> AsyncGenerator[str, None]:
print("Startup")
await asyncio.sleep(0.1)

yield "value"

await asyncio.sleep(0.1)
print("Shutdown")


async def my_task(dep: str = TaskiqDepends(dependency)) -> None:
print(dep.upper())
17 changes: 17 additions & 0 deletions docs/examples/state/class_dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from taskiq import TaskiqDepends


async def db_connection() -> str:
return "let's pretend as this is a connection"


class MyDAO:
def __init__(self, db_conn: str = TaskiqDepends(db_connection)) -> None:
self.db_conn = db_conn

def get_users(self) -> str:
return self.db_conn.upper()


def my_task(dao: MyDAO = TaskiqDepends()) -> None:
print(dao.get_users())
26 changes: 26 additions & 0 deletions docs/examples/state/dependencies_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import random

from taskiq import TaskiqDepends


def common_dep() -> int:
# For example it returns 8
return random.randint(1, 10)


def dep1(cd: int = TaskiqDepends(common_dep)) -> int:
# This function will return 9
return cd + 1


def dep2(cd: int = TaskiqDepends(common_dep)) -> int:
# This function will return 10
return cd + 2


def my_task(
d1: int = TaskiqDepends(dep1),
d2: int = TaskiqDepends(dep2),
) -> int:
# This function will return 19
return d1 + d2
7 changes: 3 additions & 4 deletions docs/examples/state/events_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend

from taskiq import Context, TaskiqEvents, TaskiqState
from taskiq.context import default_context
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState

# To run this example, please install:
# * taskiq
Expand Down Expand Up @@ -34,14 +33,14 @@ async def shutdown(state: TaskiqState) -> None:


@broker.task
async def get_val(key: str, context: Context = default_context) -> Optional[str]:
async def get_val(key: str, context: Context = TaskiqDepends()) -> Optional[str]:
# Now we can use our pool.
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
return await redis.get(key)


@broker.task
async def set_val(key: str, value: str, context: Context = default_context) -> None:
async def set_val(key: str, value: str, context: Context = TaskiqDepends()) -> None:
# Now we can use our pool to set value.
await Redis(connection_pool=context.state.redis).set(key, value)

Expand Down
15 changes: 15 additions & 0 deletions docs/examples/state/generator_deps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Generator

from taskiq import TaskiqDepends


def dependency() -> Generator[str, None, None]:
print("Startup")

yield "value"

print("Shutdown")


async def my_task(dep: str = TaskiqDepends(dependency)) -> None:
print(dep.upper())
22 changes: 22 additions & 0 deletions docs/examples/state/no_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import random

from taskiq import TaskiqDepends


def common_dep() -> int:
return random.randint(1, 10)


def dep1(cd: int = TaskiqDepends(common_dep)) -> int:
return cd + 1


def dep2(cd: int = TaskiqDepends(common_dep, use_cache=False)) -> int:
return cd + 2


def my_task(
d1: int = TaskiqDepends(dep1),
d2: int = TaskiqDepends(dep2),
) -> int:
return d1 + d2
6 changes: 6 additions & 0 deletions docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ it may execute one task N times, where N is the number of running scheduler inst

This command will import the scheduler you defined and start sending tasks to your broker.

::: tip Cool tip!

The scheduler doesn't execute tasks. It only sends them.

:::

You can check list of available schedule sources in the [Available schedule sources](../available-components/schedule-sources.md) section.


Expand Down
172 changes: 172 additions & 0 deletions docs/guide/state-and-deps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
---
order: 7
---

# State and Dependencies


## State

The `TaskiqState` is a global variable where you can keep the variables you want to use later.
For example, you want to open a database connection pool at a broker's startup.

This can be acieved by adding event handlers.

You can use one of these events:
* `WORKER_STARTUP`
* `CLIENT_STARTUP`
* `WORKER_SHUTDOWN`
* `CLIENT_SHUTDOWN`

Worker events are called when you start listening to the broker messages using taskiq.
Client events are called when you call the `startup` method of your broker from your code.

This is an example of code using event handlers:

@[code python](../examples/state/events_example.py)

::: tip Cool tip!

If you want to add handlers programmatically, you can use the `broker.add_event_handler` function.

:::

As you can see in this example, this worker will initialize the Redis pool at the startup.
You can access the state from the context.


## Dependencies

Using context directly is nice, but this way won't get completion.

That's why we suggest you try TaskiqDependencies. The implementation is very similar to FastApi's dependencies. You can use classes, functions, and generators as dependencies.

::: danger Cool alarm!

FastAPI's `Depends` is not compatible with `TaskiqDepends`.

:::

### How dependencies are useful

You can use dependencies for better autocompletion and reduce the amount of code you write.
Since the state is generic, we cannot guess the types of the state fields.
Dependencies can be annotated with type hints and therfore provide better auto-completion.

Let's assume that you've stored a Redis connection pool in the state as in the example above.
```python
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
# Here we store connection pool on startup for later use.
state.redis = ConnectionPool.from_url("redis://localhost/1")

```

You can access this variable by using the current execution context directly, like this:

```python
@broker.task
async def my_task(context: Context = TaskiqDepends()) -> None:
async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis:
await redis.set('key', 'value')
```

If you hit the `TAB` button after the `context.state.` expression, your IDE won't give you any auto-completion.
But we can create a dependency function to add auto-completion.

```python

def redis_dep(context: Context = TaskiqDepends()) -> Redis:
return Redis(connection_pool=context.state.redis, decode_responses=True)

@broker.task
async def my_task(redis: Redis = TaskiqDepends(redis_dep)) -> None:
await redis.set('key', 'value')

```

Now, this dependency injection will be autocompleted. But, of course, state fields cannot be autocompleted,
even in dependencies. But this way, you won't make any typos while writing tasks.


### How do dependencies work

We build a graph of dependencies on startup. If the parameter of the function has
the default value of `TaskiqDepends` this parameter will be treated as a dependency.

Dependencies can also depend on something. Also dependencies are optimized to **not** evaluate things many times.

For example:

@[code python](../examples/state/dependencies_tree.py)

In this code, the dependency `common_dep` is going to be evaluated only once and the `dep1` and the `dep2` are going to recevie the same value. You can control this behaviour by using the `use_cache=False` parameter to you dependency. This parameter will force the
dependency to reevaluate all it's subdependencies.


In this example we cannot predict the result. Since the `dep2` doesn't use cache for the `common_dep` function.
@[code python](../examples/state/no_cache.py)

The graph for cached dependencies looks like this:

```mermaid
graph TD
A[common_dep]
B[dep1]
C[dep2]
D[my_task]
A --> B
A --> C
B --> D
C --> D
```

The dependencies graph for `my_task` where `dep2` doesn't use cached value for `common_dep` looks like this:

```mermaid
graph TD
A[common_dep]
B[dep1]
D[my_task]
C[dep2]
subgraph without cache
A1[common_dep]
end
A --> B
A1 --> C
B --> D
C --> D
```

### Class as a dependency

You can use classes as dependencies, and they can also use other dependencies too.

Let's see an example:

@[code python](../examples/state/class_dependency.py)

As you can see, the dependency for `my_task` function is declared with `TaskiqDependency()`.
It's because you can omit the class if it's declared in typehint for the parameter. This feature doesn't
work with dependency functions, it's only for classes.

You can pass dependencies for classes in the constructor.

### Generator dependencies

Generator dependencies are used to perform startup before task execution and teardown after the task execution.

@[code python](../examples/state/generator_deps.py)

In this example, we can do something at startup before the execution and at shutdown after the task is completed.

If you want to do something asynchronously, convert this function to an asynchronous generator. Like this:

@[code python](../examples/state/async_generator_deps.py)


### Default dependencies

By default taskiq has only two deendencies:
* Context from `taskiq.context.Context`
* TaskiqState from `taskiq.state.TaskiqState`
32 changes: 0 additions & 32 deletions docs/guide/state-and-events.md

This file was deleted.

Loading