Skip to content

Commit 1d1703a

Browse files
committed
Merge branch 'develop'
2 parents e91bd27 + ed38d52 commit 1d1703a

10 files changed

+584
-958
lines changed

.flake8

Lines changed: 0 additions & 120 deletions
This file was deleted.

.github/workflows/release.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
name: Release python package
22

33
on:
4-
push:
5-
tags:
6-
- "*"
4+
release:
5+
types:
6+
- released
77

88
jobs:
99
deploy:

poetry.lock

Lines changed: 354 additions & 805 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-nats"
3-
version = "0.5.0"
3+
version = "0.0.0"
44
description = "NATS integration for taskiq"
55
authors = ["taskiq-team <taskiq@norely.com>"]
66
readme = "README.md"
@@ -31,7 +31,7 @@ pytest = "^7.2.2"
3131
pytest-xdist = "^3.2.1"
3232
anyio = "^3.6.2"
3333
pytest-cov = "^4.0.0"
34-
wemake-python-styleguide = "^0.18.0"
34+
ruff = "^0.7.2"
3535

3636

3737
[tool.mypy]
@@ -53,7 +53,7 @@ build-backend = "poetry.core.masonry.api"
5353
[tool.ruff]
5454
# List of enabled rulsets.
5555
# See https://docs.astral.sh/ruff/rules/ for more information.
56-
select = [
56+
lint.select = [
5757
"E", # Error
5858
"F", # Pyflakes
5959
"W", # Pycodestyle
@@ -80,7 +80,7 @@ select = [
8080
"PL", # PyLint checks
8181
"RUF", # Specific to Ruff checks
8282
]
83-
ignore = [
83+
lint.ignore = [
8484
"D105", # Missing docstring in magic method
8585
"D107", # Missing docstring in __init__
8686
"D212", # Multi-line docstring summary should start at the first line
@@ -94,10 +94,10 @@ ignore = [
9494
"D106", # Missing docstring in public nested class
9595
]
9696
exclude = [".venv/"]
97-
mccabe = { max-complexity = 10 }
97+
lint.mccabe = { max-complexity = 10 }
9898
line-length = 88
9999

100-
[tool.ruff.per-file-ignores]
100+
[tool.ruff.lint.per-file-ignores]
101101
"tests/*" = [
102102
"S101", # Use of assert detected
103103
"S301", # Use of pickle detected
@@ -107,12 +107,12 @@ line-length = 88
107107
"D101", # Missing docstring in public class
108108
]
109109

110-
[tool.ruff.pydocstyle]
110+
[tool.ruff.lint.pydocstyle]
111111
convention = "pep257"
112112
ignore-decorators = ["typing.overload"]
113113

114-
[tool.ruff.pylint]
114+
[tool.ruff.lint.pylint]
115115
allow-magic-value-types = ["int", "str", "float"]
116116

117-
[tool.ruff.flake8-bugbear]
117+
[tool.ruff.lint.flake8-bugbear]
118118
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]

taskiq_nats/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
PushBasedJetStreamBroker,
1212
)
1313
from taskiq_nats.result_backend import NATSObjectStoreResultBackend
14+
from taskiq_nats.schedule_source import NATSKeyValueScheduleSource
1415

1516
__all__ = [
1617
"NatsBroker",
1718
"PushBasedJetStreamBroker",
1819
"PullBasedJetStreamBroker",
1920
"NATSObjectStoreResultBackend",
21+
"NATSKeyValueScheduleSource",
2022
]

taskiq_nats/result_backend.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any, Final, List, Optional, TypeVar, Union
22

33
import nats
4-
from nats import NATS
4+
from nats.aio.client import Client
55
from nats.js import JetStreamContext
66
from nats.js.errors import BucketNotFoundError, ObjectNotFoundError
77
from nats.js.object_store import ObjectStore
@@ -37,7 +37,7 @@ def __init__(
3737
self.serializer = serializer or PickleSerializer()
3838
self.connect_options: Final = connect_options
3939

40-
self.nats_client: NATS
40+
self.nats_client: Client
4141
self.nats_jetstream: JetStreamContext
4242
self.object_store: ObjectStore
4343

taskiq_nats/schedule_source.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import logging
2+
from typing import Any, Final, List, Optional, Union
3+
4+
import nats
5+
from nats import NATS
6+
from nats.js import JetStreamContext
7+
from nats.js.errors import BucketNotFoundError, NoKeysError
8+
from nats.js.kv import KeyValue
9+
from taskiq import ScheduledTask, ScheduleSource
10+
from taskiq.abc.serializer import TaskiqSerializer
11+
from taskiq.compat import model_dump, model_validate
12+
from taskiq.serializers import PickleSerializer
13+
14+
log = logging.getLogger(__name__)
15+
16+
17+
class NATSKeyValueScheduleSource(ScheduleSource):
18+
"""
19+
Source of schedules for NATS Key-Value storage.
20+
21+
This class allows you to store schedules in NATS Key-Value storage.
22+
Also it supports dynamic schedules.
23+
"""
24+
25+
def __init__(
26+
self,
27+
servers: Union[str, List[str]],
28+
bucket_name: str = "taskiq_schedules",
29+
prefix: str = "schedule",
30+
serializer: Optional[TaskiqSerializer] = None,
31+
**connect_options: Any,
32+
) -> None:
33+
"""Construct new result backend.
34+
35+
:param servers: NATS servers.
36+
:param bucket_name: name of the bucket where schedules would be stored.
37+
:param prefix: prefix for nats kv storage schedule keys.
38+
:param serializer: serializer for data.
39+
:param connect_kwargs: additional arguments for nats `connect()` method.
40+
"""
41+
self.servers: Final = servers
42+
self.bucket_name: Final = bucket_name
43+
self.prefix: Final = prefix
44+
self.serializer = serializer or PickleSerializer()
45+
self.connect_options: Final = connect_options
46+
47+
self.nats_client: NATS
48+
self.nats_jetstream: JetStreamContext
49+
self.kv: KeyValue
50+
51+
async def startup(self) -> None:
52+
"""Create new connection to NATS.
53+
54+
Initialize JetStream context and new KeyValue instance.
55+
"""
56+
self.nats_client = await nats.connect(
57+
servers=self.servers,
58+
**self.connect_options,
59+
)
60+
self.nats_jetstream = self.nats_client.jetstream()
61+
62+
try:
63+
self.kv = await self.nats_jetstream.key_value(self.bucket_name)
64+
except BucketNotFoundError:
65+
self.kv = await self.nats_jetstream.create_key_value(
66+
bucket=self.bucket_name,
67+
)
68+
69+
async def shutdown(self) -> None:
70+
"""Close nats connection."""
71+
if self.nats_client.is_closed:
72+
return
73+
await self.nats_client.close()
74+
75+
async def delete_schedule(self, schedule_id: str) -> None:
76+
"""Remove schedule by id."""
77+
await self.kv.delete(f"{self.prefix}.{schedule_id}")
78+
79+
async def add_schedule(self, schedule: ScheduledTask) -> None:
80+
"""
81+
Add schedule to NATS Key-Value storage.
82+
83+
:param schedule: schedule to add.
84+
:param schedule_id: schedule id.
85+
"""
86+
await self.kv.put(
87+
f"{self.prefix}.{schedule.schedule_id}",
88+
self.serializer.dumpb(model_dump(schedule)),
89+
)
90+
91+
async def get_schedules(self) -> List[ScheduledTask]:
92+
"""
93+
Get all schedules from NATS Key-Value storage.
94+
95+
This method is used by scheduler to get all schedules.
96+
97+
:return: list of schedules.
98+
"""
99+
try:
100+
schedules = await self.kv.history(f"{self.prefix}.*")
101+
except NoKeysError:
102+
return []
103+
104+
return [
105+
model_validate(ScheduledTask, self.serializer.loadb(schedule.value))
106+
for schedule in schedules
107+
if schedule and schedule.value
108+
]
109+
110+
async def post_send(self, task: ScheduledTask) -> None:
111+
"""Delete a task after it's completed."""
112+
if task.time is not None:
113+
await self.delete_schedule(task.schedule_id)

tests/test_result_backend.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,43 +111,39 @@ async def test_success_backend_default_result(
111111
assert result == default_taskiq_result
112112

113113

114-
async def test_success_backend_custom_result(
114+
async def test_error_backend_custom_result(
115115
nats_result_backend: NATSObjectStoreResultBackend[_ReturnType],
116116
custom_taskiq_result: TaskiqResult[_ReturnType],
117117
task_id: str,
118118
) -> None:
119119
"""
120-
Tests normal behavior with custom result in TaskiqResult.
120+
Tests that error is thrown on non-serializable result.
121121
122122
:param custom_taskiq_result: TaskiqResult with custom result.
123123
:param task_id: ID for task.
124124
:param redis_url: url to redis.
125125
"""
126-
await nats_result_backend.set_result(
127-
task_id=task_id,
128-
result=custom_taskiq_result,
129-
)
130-
result = await nats_result_backend.get_result(task_id=task_id)
131-
132-
assert (
133-
result.return_value.test_arg # type: ignore
134-
== custom_taskiq_result.return_value.test_arg # type: ignore
135-
)
136-
assert result.is_err == custom_taskiq_result.is_err
137-
assert result.execution_time == custom_taskiq_result.execution_time
138-
assert result.log == custom_taskiq_result.log
126+
with pytest.raises(ValueError):
127+
await nats_result_backend.set_result(
128+
task_id=task_id,
129+
result=custom_taskiq_result,
130+
)
139131

140132

141133
async def test_success_backend_is_result_ready(
142134
nats_result_backend: NATSObjectStoreResultBackend[_ReturnType],
143-
custom_taskiq_result: TaskiqResult[_ReturnType],
144135
task_id: str,
145136
) -> None:
146137
"""Tests `is_result_ready` method."""
147138
assert not await nats_result_backend.is_result_ready(task_id=task_id)
148139
await nats_result_backend.set_result(
149140
task_id=task_id,
150-
result=custom_taskiq_result,
141+
result=TaskiqResult(
142+
is_err=False,
143+
log=None,
144+
return_value="one",
145+
execution_time=1,
146+
),
151147
)
152148

153149
assert await nats_result_backend.is_result_ready(task_id=task_id)

0 commit comments

Comments
 (0)