Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions taskiq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ class ResultIsReadyError(ResultBackendError):

class SecurityError(TaskiqError):
"""Security related exception."""


class NoResultError(TaskiqError):
"""Error if user does not want to set result."""
10 changes: 6 additions & 4 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.context import Context
from taskiq.exceptions import NoResultError
from taskiq.message import TaskiqMessage
from taskiq.receiver.params_parser import parse_params
from taskiq.result import TaskiqResult
Expand Down Expand Up @@ -125,10 +126,11 @@ async def callback( # noqa: C901, WPS213
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
try:
await self.broker.result_backend.set_result(taskiq_msg.task_id, result)
for middleware in self.broker.middlewares:
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
if not isinstance(result.error, NoResultError):
await self.broker.result_backend.set_result(taskiq_msg.task_id, result)
for middleware in self.broker.middlewares:
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
except Exception as exc:
logger.exception(
"Can't set result in result backend. Cause: %s",
Expand Down
50 changes: 50 additions & 0 deletions tests/cli/worker/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.exceptions import NoResultError, TaskiqResultTimeoutError
from taskiq.message import TaskiqMessage
from taskiq.receiver import Receiver
from taskiq.result import TaskiqResult
Expand Down Expand Up @@ -284,3 +285,52 @@ async def task_sem() -> int:
assert sem_num == max_async_tasks
await listen_task
assert sem_num == max_async_tasks + 2


@pytest.mark.anyio
async def test_no_result_error() -> None:
broker = InMemoryBroker()
executed = asyncio.Event()

@broker.task
async def task_no_result() -> int:
executed.set()
raise NoResultError()

task = await task_no_result.kiq()
with pytest.raises(TaskiqResultTimeoutError):
await task.wait_result(timeout=1)

assert executed.is_set()
assert not broker._running_tasks


@pytest.mark.anyio
async def test_result() -> None:
broker = InMemoryBroker()

@broker.task
async def task_no_result() -> str:
return "some value"

task = await task_no_result.kiq()
resp = await task.wait_result(timeout=1)

assert resp.return_value == "some value"
assert not broker._running_tasks


@pytest.mark.anyio
async def test_error_result() -> None:
broker = InMemoryBroker()

@broker.task
async def task_no_result() -> str:
raise ValueError("some error")

task = await task_no_result.kiq()
resp = await task.wait_result(timeout=1)

assert resp.return_value is None
assert not broker._running_tasks
assert isinstance(resp.error, ValueError)