Skip to content

Add ex and px parameters for redis backend #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,19 @@ Brokers parameters:
RedisAsyncResultBackend parameters:
* `redis_url` - url to redis.
* `keep_results` - flag to not remove results from Redis after reading.
* `result_ex_time` - expire time in seconds (by default - 1 minute)
* `result_px_time` - expire time in milliseconds (by default - not specified)
> IMPORTANT: You must specify either `result_ex_time` or `result_px_time`.
>```python
># First variant
>redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_ex_time=1000,
>)
>
># Second variant
>redis_async_result = RedisAsyncResultBackend(
> redis_url="redis://localhost:6379",
> result_px_time=1000000,
>)
>```
10 changes: 10 additions & 0 deletions taskiq_redis/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class TaskIQRedisError(Exception):
"""Base error for all taskiq-redis exceptions."""


class DuplicateExpireTimeSelectedError(TaskIQRedisError):
"""Error if two lifetimes are selected."""


class ExpireTimeMustBeMoreThanZeroError(TaskIQRedisError):
"""Error if two lifetimes are less or equal zero."""
85 changes: 56 additions & 29 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,58 @@
import pickle
from typing import TypeVar
from typing import Dict, Optional, TypeVar, Union

from redis.asyncio import ConnectionPool, Redis
from taskiq import AsyncResultBackend
from taskiq.abc.result_backend import TaskiqResult

from taskiq_redis.exceptions import (
DuplicateExpireTimeSelectedError,
ExpireTimeMustBeMoreThanZeroError,
)

_ReturnType = TypeVar("_ReturnType")


class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
"""Async result based on redis."""

def __init__(self, redis_url: str, keep_results: bool = True):
def __init__(
self,
redis_url: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
):
"""
Constructs a new result backend.

:param redis_url: url to redis.
:param keep_results: flag to not remove results from Redis after reading.
:param result_ex_time: expire time in seconds for result.
:param result_px_time: expire time in milliseconds for result.

:raises DuplicateExpireTimeSelectedError: if result_ex_time
and result_px_time are selected.
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
and result_px_time are equal zero.
"""
self.redis_pool = ConnectionPool.from_url(redis_url)
self.keep_results = keep_results
self.result_ex_time = result_ex_time
self.result_px_time = result_px_time

if self.result_ex_time == 0 or self.result_px_time == 0:
raise ExpireTimeMustBeMoreThanZeroError(
"You must select one expire time param and it must be more than zero.",
)

if self.result_ex_time and self.result_px_time:
raise DuplicateExpireTimeSelectedError(
"Choose either result_ex_time or result_px_time.",
)

if not self.result_ex_time and not self.result_px_time:
self.result_ex_time = 60

async def shutdown(self) -> None:
"""Closes redis connection."""
Expand All @@ -40,19 +73,17 @@ async def set_result(
:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
result_dict = result.dict(exclude={"return_value"})

for result_key, result_value in result_dict.items():
result_dict[result_key] = pickle.dumps(result_value)
# This trick will preserve original returned value.
# It helps when you return not serializable classes.
result_dict["return_value"] = pickle.dumps(result.return_value)
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": task_id,
"value": pickle.dumps(result),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

async with Redis(connection_pool=self.redis_pool) as redis:
await redis.hset(
task_id,
mapping=result_dict,
)
await redis.set(**redis_set_params)

async def is_result_ready(self, task_id: str) -> bool:
"""
Expand All @@ -77,23 +108,19 @@ async def get_result( # noqa: WPS210
:param with_logs: if True it will download task's logs.
:return: task's return value.
"""
fields = list(TaskiqResult.__fields__.keys())

if not with_logs:
fields.remove("log")

async with Redis(connection_pool=self.redis_pool) as redis:
result_values = await redis.hmget(
name=task_id,
keys=fields,
)
if self.keep_results:
result_value = await redis.get(
name=task_id,
)
else:
result_value = await redis.getdel(
name=task_id,
)

if not self.keep_results:
await redis.delete(task_id)
taskiq_result: TaskiqResult[_ReturnType] = pickle.loads(result_value)

result = {
result_key: pickle.loads(result_value)
for result_value, result_key in zip(result_values, fields)
}
if not with_logs:
taskiq_result.log = None

return TaskiqResult(**result)
return taskiq_result
Loading