Skip to content

Added redis sentinel support #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,24 @@ services:
REDIS_CLUSTER_CREATOR: "yes"
ports:
- 7001:6379

redis-master:
image: bitnami/redis:6.2.5
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s

redis-sentinel:
image: bitnami/redis-sentinel:latest
depends_on:
- redis-master
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_MASTER_HOST: "redis-master"
ports:
- 7002:26379
10 changes: 10 additions & 0 deletions taskiq_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
from taskiq_redis.redis_backend import (
RedisAsyncClusterResultBackend,
RedisAsyncResultBackend,
RedisAsyncSentinelResultBackend,
)
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker
from taskiq_redis.redis_sentinel_broker import (
ListQueueSentinelBroker,
PubSubSentinelBroker,
)
from taskiq_redis.schedule_source import (
RedisClusterScheduleSource,
RedisScheduleSource,
RedisSentinelScheduleSource,
)

__all__ = [
"RedisAsyncClusterResultBackend",
"RedisAsyncResultBackend",
"RedisAsyncSentinelResultBackend",
"ListQueueBroker",
"PubSubBroker",
"ListQueueClusterBroker",
"ListQueueSentinelBroker",
"PubSubSentinelBroker",
"RedisScheduleSource",
"RedisClusterScheduleSource",
"RedisSentinelScheduleSource",
]
167 changes: 165 additions & 2 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
import pickle
from typing import Any, Dict, Optional, TypeVar, Union
import sys
from contextlib import asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Dict,
List,
Optional,
Tuple,
TypeVar,
Union,
)

from redis.asyncio import BlockingConnectionPool, Redis
from redis.asyncio import BlockingConnectionPool, Redis, Sentinel
from redis.asyncio.cluster import RedisCluster
from taskiq import AsyncResultBackend
from taskiq.abc.result_backend import TaskiqResult
from taskiq.abc.serializer import TaskiqSerializer

from taskiq_redis.exceptions import (
DuplicateExpireTimeSelectedError,
ExpireTimeMustBeMoreThanZeroError,
ResultIsMissingError,
)
from taskiq_redis.serializer import PickleSerializer

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

if TYPE_CHECKING:
_Redis: TypeAlias = Redis[bytes]
else:
_Redis: TypeAlias = Redis

_ReturnType = TypeVar("_ReturnType")

Expand Down Expand Up @@ -267,3 +291,142 @@ async def get_result(
taskiq_result.log = None

return taskiq_result


class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):
"""Async result based on redis sentinel."""

def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
"""
Constructs a new result backend.

:param sentinels: list of sentinel host and ports pairs.
:param master_name: sentinel master name.
:param keep_results: flag to not remove results from Redis after reading.
:param result_ex_time: expire time in seconds for result.
:param result_px_time: expire time in milliseconds for result.
:param max_connection_pool_size: maximum number of connections in pool.
:param connection_kwargs: additional arguments for redis BlockingConnectionPool.

:raises DuplicateExpireTimeSelectedError: if result_ex_time
and result_px_time are selected.
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
and result_px_time are equal zero.
"""
self.sentinel = Sentinel(
sentinels=sentinels,
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)
self.master_name = master_name
if serializer is None:
serializer = PickleSerializer()
self.serializer = serializer
self.keep_results = keep_results
self.result_ex_time = result_ex_time
self.result_px_time = result_px_time

unavailable_conditions = any(
(
self.result_ex_time is not None and self.result_ex_time <= 0,
self.result_px_time is not None and self.result_px_time <= 0,
),
)
if unavailable_conditions:
raise ExpireTimeMustBeMoreThanZeroError(
"You must select one expire time param and it must be more than zero.",
)

if self.result_ex_time and self.result_px_time:
raise DuplicateExpireTimeSelectedError(
"Choose either result_ex_time or result_px_time.",
)

@asynccontextmanager
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
async with self.sentinel.master_for(self.master_name) as redis_conn:
yield redis_conn

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
"""
Sets task result in redis.

Dumps TaskiqResult instance into the bytes and writes
it to redis.

:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": task_id,
"value": self.serializer.dumpb(result),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

async with self._acquire_master_conn() as redis:
await redis.set(**redis_set_params) # type: ignore

async def is_result_ready(self, task_id: str) -> bool:
"""
Returns whether the result is ready.

:param task_id: ID of the task.

:returns: True if the result is ready else False.
"""
async with self._acquire_master_conn() as redis:
return bool(await redis.exists(task_id))

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
"""
Gets result from the task.

:param task_id: task's id.
:param with_logs: if True it will download task's logs.
:raises ResultIsMissingError: if there is no result when trying to get it.
:return: task's return value.
"""
async with self._acquire_master_conn() as redis:
if self.keep_results:
result_value = await redis.get(
name=task_id,
)
else:
result_value = await redis.getdel(
name=task_id,
)

if result_value is None:
raise ResultIsMissingError

taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
result_value,
)

if not with_logs:
taskiq_result.log = None

return taskiq_result
132 changes: 132 additions & 0 deletions taskiq_redis/redis_sentinel_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import sys
from contextlib import asynccontextmanager
from logging import getLogger
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
AsyncIterator,
Callable,
List,
Optional,
Tuple,
TypeVar,
)

from redis.asyncio import Redis, Sentinel
from taskiq import AsyncResultBackend, BrokerMessage
from taskiq.abc.broker import AsyncBroker

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

if TYPE_CHECKING:
_Redis: TypeAlias = Redis[bytes]
else:
_Redis: TypeAlias = Redis

_T = TypeVar("_T")

logger = getLogger("taskiq.redis_sentinel_broker")


class BaseSentinelBroker(AsyncBroker):
"""Base broker that works with Sentinel."""

def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
result_backend: Optional[AsyncResultBackend[_T]] = None,
task_id_generator: Optional[Callable[[], str]] = None,
queue_name: str = "taskiq",
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
**connection_kwargs: Any,
) -> None:
super().__init__(
result_backend=result_backend,
task_id_generator=task_id_generator,
)

self.sentinel = Sentinel(
sentinels=sentinels,
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)
self.master_name = master_name
self.queue_name = queue_name

@asynccontextmanager
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
async with self.sentinel.master_for(self.master_name) as redis_conn:
yield redis_conn


class PubSubSentinelBroker(BaseSentinelBroker):
"""Broker that works with Sentinel and broadcasts tasks to all workers."""

async def kick(self, message: BrokerMessage) -> None:
"""
Publish message over PUBSUB channel.

:param message: message to send.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.publish(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen redis queue for new messages.

This function listens to the pubsub channel
and yields all messages with proper types.

:yields: broker messages.
"""
async with self._acquire_master_conn() as redis_conn:
redis_pubsub_channel = redis_conn.pubsub()
await redis_pubsub_channel.subscribe(self.queue_name)
async for message in redis_pubsub_channel.listen():
if not message:
continue
if message["type"] != "message":
logger.debug("Received non-message from redis: %s", message)
continue
yield message["data"]


class ListQueueSentinelBroker(BaseSentinelBroker):
"""Broker that works with Sentinel and distributes tasks between workers."""

async def kick(self, message: BrokerMessage) -> None:
"""
Put a message in a list.

This method appends a message to the list of all messages.

:param message: message to append.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.lpush(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen redis queue for new messages.

This function listens to the queue
and yields new messages if they have BrokerMessage type.

:yields: broker messages.
"""
redis_brpop_data_position = 1
async with self._acquire_master_conn() as redis_conn:
while True:
yield (await redis_conn.brpop(self.queue_name))[
redis_brpop_data_position
]
Loading
Loading