Skip to content

Commit 4eb04ed

Browse files
authored
Added new list-based schedule source. (#79)
* Added new schedule source. * Fixed pytest.
1 parent 44cc44f commit 4eb04ed

File tree

6 files changed

+477
-1
lines changed

6 files changed

+477
-1
lines changed

README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,67 @@ RedisAsyncResultBackend parameters:
125125
> result_px_time=1000000,
126126
> )
127127
> ```
128+
129+
130+
## Schedule sources
131+
132+
133+
You can use this package to add dynamic schedule sources. They are used to store
134+
schedules for taskiq scheduler.
135+
136+
The advantage of using schedule sources from this package over default `LabelBased` source is that you can
137+
dynamically add schedules in it.
138+
139+
We have two types of schedules:
140+
141+
* `RedisScheduleSource`
142+
* `ListRedisScheduleSource`
143+
144+
145+
### RedisScheduleSource
146+
147+
This source is super simple. It stores all schedules by key `{prefix}:{schedule_id}`. When scheduler requests
148+
schedules, it retrieves all values from redis that start with a given `prefix`.
149+
150+
This is very ineficent and should not be used for high-volume schedules. Because if you have `1000` schedules, this scheduler will make at least `20` requests to retrieve them (we use `scan` and `mget` to minimize number of calls).
151+
152+
### ListRedisScheduleSource
153+
154+
This source holds values in lists.
155+
156+
* For cron tasks it uses key `{prefix}:cron`.
157+
* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run.
158+
159+
The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to redis.
160+
161+
162+
### Migration from one source to another
163+
164+
To migrate from `RedisScheduleSource` to `ListRedisScheduleSource` you can define the latter as this:
165+
166+
```python
167+
# broker.py
168+
import asyncio
169+
import datetime
170+
171+
from taskiq import TaskiqScheduler
172+
173+
from taskiq_redis import ListRedisScheduleSource, RedisStreamBroker
174+
from taskiq_redis.schedule_source import RedisScheduleSource
175+
176+
broker = RedisStreamBroker(url="redis://localhost:6379")
177+
178+
old_source = RedisScheduleSource("redis://localhost/1", prefix="prefix1")
179+
array_source = ListRedisScheduleSource(
180+
"redis://localhost/1",
181+
prefix="prefix2",
182+
# To migrate schedules from an old source.
183+
).with_migrate_from(
184+
old_source,
185+
# To delete schedules from an old source.
186+
delete_schedules=True,
187+
)
188+
scheduler = TaskiqScheduler(broker, [array_source])
189+
```
190+
191+
During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two.

poetry.lock

Lines changed: 43 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ fakeredis = "^2"
4040
pre-commit = "^4"
4141
pytest-xdist = { version = "^3", extras = ["psutil"] }
4242
ruff = "^0"
43+
freezegun = "^1.5.1"
4344

4445
[tool.mypy]
4546
strict = true

taskiq_redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Package for redis integration."""
22

3+
from taskiq_redis.list_schedule_source import ListRedisScheduleSource
34
from taskiq_redis.redis_backend import (
45
RedisAsyncClusterResultBackend,
56
RedisAsyncResultBackend,
@@ -25,6 +26,7 @@
2526
"ListQueueBroker",
2627
"ListQueueClusterBroker",
2728
"ListQueueSentinelBroker",
29+
"ListRedisScheduleSource",
2830
"PubSubBroker",
2931
"PubSubSentinelBroker",
3032
"RedisAsyncClusterResultBackend",

taskiq_redis/list_schedule_source.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import datetime
2+
from logging import getLogger
3+
from typing import Any, List, Optional
4+
5+
from redis.asyncio import BlockingConnectionPool, Redis
6+
from taskiq import ScheduledTask, ScheduleSource
7+
from taskiq.abc.serializer import TaskiqSerializer
8+
from taskiq.compat import model_dump, model_validate
9+
from taskiq.serializers import PickleSerializer
10+
from typing_extensions import Self
11+
12+
logger = getLogger("taskiq.redis_schedule_source")
13+
14+
15+
class ListRedisScheduleSource(ScheduleSource):
16+
"""Schecule source based on arrays."""
17+
18+
def __init__(
19+
self,
20+
url: str,
21+
prefix: str = "schedule",
22+
max_connection_pool_size: Optional[int] = None,
23+
serializer: Optional[TaskiqSerializer] = None,
24+
bufffer_size: int = 50,
25+
skip_past_schedules: bool = False,
26+
**connection_kwargs: Any,
27+
) -> None:
28+
super().__init__()
29+
self._prefix = prefix
30+
self._buffer_size = bufffer_size
31+
self._connection_pool = BlockingConnectionPool.from_url(
32+
url=url,
33+
max_connections=max_connection_pool_size,
34+
**connection_kwargs,
35+
)
36+
if serializer is None:
37+
serializer = PickleSerializer()
38+
self._serializer = serializer
39+
self._is_first_run = True
40+
self._previous_schedule_source: Optional[ScheduleSource] = None
41+
self._delete_schedules_after_migration: bool = True
42+
self._skip_past_schedules = skip_past_schedules
43+
44+
async def startup(self) -> None:
45+
"""
46+
Startup the schedule source.
47+
48+
By default this function does nothing.
49+
But if the previous schedule source is set,
50+
it will try to migrate schedules from it.
51+
"""
52+
if self._previous_schedule_source is not None:
53+
logger.info("Migrating schedules from previous source")
54+
await self._previous_schedule_source.startup()
55+
schedules = await self._previous_schedule_source.get_schedules()
56+
logger.info(f"Found {len(schedules)}")
57+
for schedule in schedules:
58+
await self.add_schedule(schedule)
59+
if self._delete_schedules_after_migration:
60+
await self._previous_schedule_source.delete_schedule(
61+
schedule.schedule_id,
62+
)
63+
await self._previous_schedule_source.shutdown()
64+
logger.info("Migration complete")
65+
66+
def _get_time_key(self, time: datetime.datetime) -> str:
67+
"""Get the key for a time-based schedule."""
68+
if time.tzinfo is None:
69+
time = time.replace(tzinfo=datetime.timezone.utc)
70+
iso_time = time.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M")
71+
return f"{self._prefix}:time:{iso_time}"
72+
73+
def _get_cron_key(self) -> str:
74+
"""Get the key for a cron-based schedule."""
75+
return f"{self._prefix}:cron"
76+
77+
def _get_data_key(self, schedule_id: str) -> str:
78+
"""Get the key for a schedule data."""
79+
return f"{self._prefix}:data:{schedule_id}"
80+
81+
def _parse_time_key(self, key: str) -> Optional[datetime.datetime]:
82+
"""Get time value from the timed-key."""
83+
try:
84+
dt_str = key.split(":", 2)[2]
85+
return datetime.datetime.strptime(dt_str, "%Y-%m-%dT%H:%M").replace(
86+
tzinfo=datetime.timezone.utc,
87+
)
88+
except ValueError:
89+
logger.debug("Failed to parse time key %s", key)
90+
return None
91+
92+
async def _get_previous_time_schedules(self) -> list[bytes]:
93+
"""
94+
Function that gets all timed schedules that are in the past.
95+
96+
Since this source doesn't retrieve all the schedules at once,
97+
we need to get all the schedules that are in the past and haven't
98+
been sent yet.
99+
100+
We do this by getting all the time keys and checking if the time
101+
is less than the current time.
102+
103+
This function is called only during the first run to minimize
104+
the number of requests to the Redis server.
105+
"""
106+
logger.info("Getting previous time schedules")
107+
minute_before = datetime.datetime.now(
108+
datetime.timezone.utc,
109+
).replace(second=0, microsecond=0) - datetime.timedelta(
110+
minutes=1,
111+
)
112+
schedules = []
113+
async with Redis(connection_pool=self._connection_pool) as redis:
114+
time_keys: list[str] = []
115+
# We need to get all the time keys and check if the time is less than
116+
# the current time.
117+
async for key in redis.scan_iter(f"{self._prefix}:time:*"):
118+
key_time = self._parse_time_key(key.decode())
119+
if key_time and key_time <= minute_before:
120+
time_keys.append(key.decode())
121+
for key in time_keys:
122+
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore
123+
124+
return schedules
125+
126+
async def delete_schedule(self, schedule_id: str) -> None:
127+
"""Delete a schedule from the source."""
128+
async with Redis(connection_pool=self._connection_pool) as redis:
129+
schedule = await redis.getdel(self._get_data_key(schedule_id))
130+
if schedule is not None:
131+
logger.debug("Deleting schedule %s", schedule_id)
132+
schedule = model_validate(
133+
ScheduledTask,
134+
self._serializer.loadb(schedule),
135+
)
136+
# We need to remove the schedule from the cron or time list.
137+
if schedule.cron is not None:
138+
await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore
139+
elif schedule.time is not None:
140+
time_key = self._get_time_key(schedule.time)
141+
await redis.lrem(time_key, 0, schedule_id) # type: ignore
142+
143+
async def add_schedule(self, schedule: "ScheduledTask") -> None:
144+
"""Add a schedule to the source."""
145+
async with Redis(connection_pool=self._connection_pool) as redis:
146+
# At first we set data key which contains the schedule data.
147+
await redis.set(
148+
f"{self._prefix}:data:{schedule.schedule_id}",
149+
self._serializer.dumpb(model_dump(schedule)),
150+
)
151+
# Then we add the schedule to the cron or time list.
152+
# This is an optimization, so we can get all the schedules
153+
# for the current time much faster.
154+
if schedule.cron is not None:
155+
await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore
156+
elif schedule.time is not None:
157+
await redis.rpush( # type: ignore
158+
self._get_time_key(schedule.time),
159+
schedule.schedule_id,
160+
)
161+
162+
async def post_send(self, task: ScheduledTask) -> None:
163+
"""Delete a task after it's completed."""
164+
if task.time is not None:
165+
await self.delete_schedule(task.schedule_id)
166+
167+
async def get_schedules(self) -> List["ScheduledTask"]:
168+
"""
169+
Get all schedules.
170+
171+
This function gets all the schedules from the schedule source.
172+
What it does is get all the cron schedules and time schedules
173+
for the current time and return them.
174+
175+
If it's the first run, it also gets all the time schedules
176+
that are in the past and haven't been sent yet.
177+
"""
178+
schedules = []
179+
current_time = datetime.datetime.now(datetime.timezone.utc)
180+
timed: list[bytes] = []
181+
# Only during first run, we need to get previous time schedules
182+
if self._is_first_run and not self._skip_past_schedules:
183+
timed = await self._get_previous_time_schedules()
184+
self._is_first_run = False
185+
async with Redis(connection_pool=self._connection_pool) as redis:
186+
buffer = []
187+
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore
188+
logger.debug("Got cron scheduleds: %s", crons)
189+
if crons:
190+
buffer.extend(crons)
191+
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore
192+
logger.debug("Got timed scheduleds: %s", crons)
193+
if timed:
194+
buffer.extend(timed)
195+
while buffer:
196+
schedules.extend(
197+
await redis.mget(
198+
(
199+
self._get_data_key(x.decode())
200+
for x in buffer[: self._buffer_size]
201+
),
202+
),
203+
)
204+
buffer = buffer[self._buffer_size :]
205+
206+
return [
207+
model_validate(ScheduledTask, self._serializer.loadb(schedule))
208+
for schedule in schedules
209+
if schedule
210+
]
211+
212+
def with_migrate_from(
213+
self,
214+
source: ScheduleSource,
215+
delete_schedules: bool = True,
216+
) -> Self:
217+
"""
218+
Enable migration from previous schedule source.
219+
220+
If this function is called during declaration,
221+
the source will try to migrate schedules from the previous source.
222+
223+
:param source: previous schedule source
224+
:param delete_schedules: delete schedules during migration process
225+
from the previous source.
226+
"""
227+
self._previous_schedule_source = source
228+
self._delete_schedules_after_migration = delete_schedules
229+
return self

0 commit comments

Comments
 (0)