Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,36 @@ Returned value: 2
```

Continue reading to get more information about taskiq internals.


## Timeouts

If you want to restrict amount of time you want to run task,
just add timeout label to the task.

You can do it either with decorator or when calling the task.

::: tabs

@tab decorator

```python
@broker.task(timeout=0.1)
async def mytask():
await asyncio.sleep(2)
```

@tab when calling

```python
await my_task.kicker().with_labels(timeout=0.3).kiq()
```

:::

::: danger Cool alert

We use [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) method to run sync functions. Timeouts will raise a TimeoutException, but
synchronous function may not stop from execution. This is a constraint of python.

:::
13 changes: 10 additions & 3 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,27 @@ async def run_task( # noqa: C901, WPS210
kwargs = await dep_ctx.resolve_kwargs()
# We udpate kwargs with kwargs from network.
kwargs.update(message.kwargs)

is_coroutine = True
# If the function is a coroutine, we await it.
if asyncio.iscoroutinefunction(target):
returned = await target(*message.args, **kwargs)
target_future = target(*message.args, **kwargs)
else:
is_coroutine = False
# If this is a synchronous function, we
# run it in executor.
returned = await loop.run_in_executor(
target_future = loop.run_in_executor(
self.executor,
_run_sync,
target,
message.args,
kwargs,
)
timeout = message.labels.get("timeout")
if timeout is not None:
if not is_coroutine:
logger.warning("Timeouts for sync tasks don't work in python well.")
target_future = asyncio.wait_for(target_future, float(timeout))
returned = await target_future
except NoResultError as no_res_exc:
found_exception = no_res_exc
logger.warning(
Expand Down
45 changes: 45 additions & 0 deletions tests/receiver/test_receiver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, List, Optional, TypeVar

Expand Down Expand Up @@ -117,6 +118,50 @@ def test_func() -> None:
assert result.is_err


@pytest.mark.anyio
async def test_run_timeouts() -> None:
async def test_func() -> None:
await asyncio.sleep(2)

receiver = get_receiver()

result = await receiver.run_task(
test_func,
TaskiqMessage(
task_id="",
task_name="",
labels={"timeout": "0.3"},
args=[],
kwargs={},
),
)
assert result.return_value is None
assert result.execution_time < 2
assert result.is_err


@pytest.mark.anyio
async def test_run_timeouts_sync() -> None:
def test_func() -> None:
time.sleep(2)

receiver = get_receiver()

result = await receiver.run_task(
test_func,
TaskiqMessage(
task_id="",
task_name="",
labels={"timeout": "0.3"},
args=[],
kwargs={},
),
)
assert result.return_value is None
assert result.execution_time < 2
assert result.is_err


@pytest.mark.anyio
async def test_run_task_exception_middlewares() -> None:
"""Tests that run_task can run sync tasks."""
Expand Down