Skip to content

Commit 62c030d

Browse files
committed
feat: Added RedisSentinelScheduleSource.
1 parent a9b7f9d commit 62c030d

File tree

3 files changed

+259
-3
lines changed

3 files changed

+259
-3
lines changed

taskiq_redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from taskiq_redis.schedule_source import (
1111
RedisClusterScheduleSource,
1212
RedisScheduleSource,
13+
RedisSentinelScheduleSource,
1314
)
1415

1516
__all__ = [
@@ -22,4 +23,5 @@
2223
"ListQueueSentinelBroker",
2324
"RedisScheduleSource",
2425
"RedisClusterScheduleSource",
26+
"RedisSentinelScheduleSource",
2527
]

taskiq_redis/schedule_source.py

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
1-
from typing import Any, List, Optional
1+
import sys
2+
from contextlib import asynccontextmanager
3+
from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional, Tuple
24

3-
from redis.asyncio import BlockingConnectionPool, ConnectionPool, Redis, RedisCluster
5+
from redis.asyncio import (
6+
BlockingConnectionPool,
7+
ConnectionPool,
8+
Redis,
9+
RedisCluster,
10+
Sentinel,
11+
)
412
from taskiq import ScheduleSource
513
from taskiq.abc.serializer import TaskiqSerializer
614
from taskiq.compat import model_dump, model_validate
715
from taskiq.scheduler.scheduled_task import ScheduledTask
816

917
from taskiq_redis.serializer import PickleSerializer
1018

19+
if sys.version_info >= (3, 10):
20+
from typing import TypeAlias
21+
else:
22+
from typing_extensions import TypeAlias
23+
24+
if TYPE_CHECKING:
25+
_Redis: TypeAlias = Redis[bytes]
26+
else:
27+
_Redis: TypeAlias = Redis
28+
1129

1230
class RedisScheduleSource(ScheduleSource):
1331
"""
@@ -174,3 +192,97 @@ async def post_send(self, task: ScheduledTask) -> None:
174192
"""Delete a task after it's completed."""
175193
if task.time is not None:
176194
await self.delete_schedule(task.schedule_id)
195+
196+
197+
class RedisSentinelScheduleSource(ScheduleSource):
198+
"""
199+
Source of schedules for redis cluster.
200+
201+
This class allows you to store schedules in redis.
202+
Also it supports dynamic schedules.
203+
204+
:param sentinels: list of sentinel host and ports pairs.
205+
:param master_name: sentinel master name.
206+
:param prefix: prefix for redis schedule keys.
207+
:param buffer_size: buffer size for redis scan.
208+
This is how many keys will be fetched at once.
209+
:param max_connection_pool_size: maximum number of connections in pool.
210+
:param serializer: serializer for data.
211+
:param connection_kwargs: additional arguments for RedisCluster.
212+
"""
213+
214+
def __init__(
215+
self,
216+
sentinels: List[Tuple[str, int]],
217+
master_name: str,
218+
prefix: str = "schedule",
219+
buffer_size: int = 50,
220+
serializer: Optional[TaskiqSerializer] = None,
221+
min_other_sentinels: int = 0,
222+
sentinel_kwargs: Optional[Any] = None,
223+
**connection_kwargs: Any,
224+
) -> None:
225+
self.prefix = prefix
226+
self.sentinel = Sentinel(
227+
sentinels=sentinels,
228+
min_other_sentinels=min_other_sentinels,
229+
sentinel_kwargs=sentinel_kwargs,
230+
**connection_kwargs,
231+
)
232+
self.master_name = master_name
233+
self.buffer_size = buffer_size
234+
if serializer is None:
235+
serializer = PickleSerializer()
236+
self.serializer = serializer
237+
238+
@asynccontextmanager
239+
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
240+
async with self.sentinel.master_for(self.master_name) as redis_conn:
241+
yield redis_conn
242+
243+
async def delete_schedule(self, schedule_id: str) -> None:
244+
"""Remove schedule by id."""
245+
async with self._acquire_master_conn() as redis:
246+
await redis.delete(f"{self.prefix}:{schedule_id}")
247+
248+
async def add_schedule(self, schedule: ScheduledTask) -> None:
249+
"""
250+
Add schedule to redis.
251+
252+
:param schedule: schedule to add.
253+
:param schedule_id: schedule id.
254+
"""
255+
async with self._acquire_master_conn() as redis:
256+
await redis.set(
257+
f"{self.prefix}:{schedule.schedule_id}",
258+
self.serializer.dumpb(model_dump(schedule)),
259+
)
260+
261+
async def get_schedules(self) -> List[ScheduledTask]:
262+
"""
263+
Get all schedules from redis.
264+
265+
This method is used by scheduler to get all schedules.
266+
267+
:return: list of schedules.
268+
"""
269+
schedules = []
270+
async with self._acquire_master_conn() as redis:
271+
buffer = []
272+
async for key in redis.scan_iter(f"{self.prefix}:*"):
273+
buffer.append(key)
274+
if len(buffer) >= self.buffer_size:
275+
schedules.extend(await redis.mget(buffer))
276+
buffer = []
277+
if buffer:
278+
schedules.extend(await redis.mget(buffer))
279+
return [
280+
model_validate(ScheduledTask, self.serializer.loadb(schedule))
281+
for schedule in schedules
282+
if schedule
283+
]
284+
285+
async def post_send(self, task: ScheduledTask) -> None:
286+
"""Delete a task after it's completed."""
287+
if task.time is not None:
288+
await self.delete_schedule(task.schedule_id)

tests/test_schedule_source.py

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import asyncio
22
import datetime as dt
33
import uuid
4+
from typing import List, Tuple
45

56
import pytest
67
from taskiq import ScheduledTask
78

8-
from taskiq_redis import RedisClusterScheduleSource, RedisScheduleSource
9+
from taskiq_redis import (
10+
RedisClusterScheduleSource,
11+
RedisScheduleSource,
12+
RedisSentinelScheduleSource,
13+
)
914

1015

1116
@pytest.mark.anyio
@@ -220,3 +225,140 @@ async def test_cluster_buffer(redis_cluster_url: str) -> None:
220225
assert schedule1 in schedules
221226
assert schedule2 in schedules
222227
await source.shutdown()
228+
229+
230+
@pytest.mark.anyio
231+
async def test_sentinel_set_schedule(
232+
redis_sentinels: List[Tuple[str, int]],
233+
redis_sentinel_master_name: str,
234+
) -> None:
235+
prefix = uuid.uuid4().hex
236+
source = RedisSentinelScheduleSource(
237+
sentinels=redis_sentinels,
238+
master_name=redis_sentinel_master_name,
239+
prefix=prefix,
240+
)
241+
schedule = ScheduledTask(
242+
task_name="test_task",
243+
labels={},
244+
args=[],
245+
kwargs={},
246+
cron="* * * * *",
247+
)
248+
await source.add_schedule(schedule)
249+
schedules = await source.get_schedules()
250+
assert schedules == [schedule]
251+
await source.shutdown()
252+
253+
254+
@pytest.mark.anyio
255+
async def test_sentinel_delete_schedule(
256+
redis_sentinels: List[Tuple[str, int]],
257+
redis_sentinel_master_name: str,
258+
) -> None:
259+
prefix = uuid.uuid4().hex
260+
source = RedisSentinelScheduleSource(
261+
sentinels=redis_sentinels,
262+
master_name=redis_sentinel_master_name,
263+
prefix=prefix,
264+
)
265+
schedule = ScheduledTask(
266+
task_name="test_task",
267+
labels={},
268+
args=[],
269+
kwargs={},
270+
cron="* * * * *",
271+
)
272+
await source.add_schedule(schedule)
273+
schedules = await source.get_schedules()
274+
assert schedules == [schedule]
275+
await source.delete_schedule(schedule.schedule_id)
276+
schedules = await source.get_schedules()
277+
# Schedules are empty.
278+
assert not schedules
279+
await source.shutdown()
280+
281+
282+
@pytest.mark.anyio
283+
async def test_sentinel_post_run_cron(
284+
redis_sentinels: List[Tuple[str, int]],
285+
redis_sentinel_master_name: str,
286+
) -> None:
287+
prefix = uuid.uuid4().hex
288+
source = RedisSentinelScheduleSource(
289+
sentinels=redis_sentinels,
290+
master_name=redis_sentinel_master_name,
291+
prefix=prefix,
292+
)
293+
schedule = ScheduledTask(
294+
task_name="test_task",
295+
labels={},
296+
args=[],
297+
kwargs={},
298+
cron="* * * * *",
299+
)
300+
await source.add_schedule(schedule)
301+
assert await source.get_schedules() == [schedule]
302+
await source.post_send(schedule)
303+
assert await source.get_schedules() == [schedule]
304+
await source.shutdown()
305+
306+
307+
@pytest.mark.anyio
308+
async def test_sentinel_post_run_time(
309+
redis_sentinels: List[Tuple[str, int]],
310+
redis_sentinel_master_name: str,
311+
) -> None:
312+
prefix = uuid.uuid4().hex
313+
source = RedisSentinelScheduleSource(
314+
sentinels=redis_sentinels,
315+
master_name=redis_sentinel_master_name,
316+
prefix=prefix,
317+
)
318+
schedule = ScheduledTask(
319+
task_name="test_task",
320+
labels={},
321+
args=[],
322+
kwargs={},
323+
time=dt.datetime(2000, 1, 1),
324+
)
325+
await source.add_schedule(schedule)
326+
assert await source.get_schedules() == [schedule]
327+
await source.post_send(schedule)
328+
assert await source.get_schedules() == []
329+
await source.shutdown()
330+
331+
332+
@pytest.mark.anyio
333+
async def test_sentinel_buffer(
334+
redis_sentinels: List[Tuple[str, int]],
335+
redis_sentinel_master_name: str,
336+
) -> None:
337+
prefix = uuid.uuid4().hex
338+
source = RedisSentinelScheduleSource(
339+
sentinels=redis_sentinels,
340+
master_name=redis_sentinel_master_name,
341+
prefix=prefix,
342+
buffer_size=1,
343+
)
344+
schedule1 = ScheduledTask(
345+
task_name="test_task1",
346+
labels={},
347+
args=[],
348+
kwargs={},
349+
cron="* * * * *",
350+
)
351+
schedule2 = ScheduledTask(
352+
task_name="test_task2",
353+
labels={},
354+
args=[],
355+
kwargs={},
356+
cron="* * * * *",
357+
)
358+
await source.add_schedule(schedule1)
359+
await source.add_schedule(schedule2)
360+
schedules = await source.get_schedules()
361+
assert len(schedules) == 2
362+
assert schedule1 in schedules
363+
assert schedule2 in schedules
364+
await source.shutdown()

0 commit comments

Comments
 (0)