Skip to content

Switch to broker.serializer instead of JSON-only solution #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
593 changes: 310 additions & 283 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ keywords = ["taskiq", "pipelines", "tasks", "distributed", "async"]

[tool.poetry.dependencies]
python = "^3.8.1"
taskiq = ">=0.0.8, <1"
taskiq = ">=0.10.1, <1"
typing-extensions = "^4.3.0"
pydantic = "^2"

Expand Down
18 changes: 0 additions & 18 deletions taskiq_pipelines/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,6 @@ def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
# known steps.
cls._known_steps[step_name] = cls

@abstractmethod
def dumps(self) -> str:
"""
Generate parsable string.

:return: dumped object.
"""

@classmethod
@abstractmethod
def loads(cls: Type[_T], data: str) -> _T:
"""
Method to load previously dumped data.

:param data: dumped data.
:return: instance of a class.
"""

@abstractmethod
async def act(
self,
Expand Down
18 changes: 15 additions & 3 deletions taskiq_pipelines/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class PipelineMiddleware(TaskiqMiddleware):
"""Pipeline middleware."""

async def post_save( # noqa: C901, WPS212
async def post_save( # noqa: C901, WPS210, WPS212
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
Expand All @@ -41,9 +41,21 @@ async def post_save( # noqa: C901, WPS212
logger.warn("Pipline data not found. Execution flow is broken.")
return
pipeline_data = message.labels[PIPELINE_DATA]
# workaround for obligatory casting label values to `str`
# in `AsyncKicker._prepare_message`.
# The trick can be removed later after adding explicit `bytes` support.
if ( # noqa: WPS337
isinstance(pipeline_data, str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering is this because of this line:

https://github.com/taskiq-python/taskiq/blob/a74fa4d4bc47662ea46dc3af51c156fc23c21d94/taskiq/kicker.py#L253

Maybe before merging we can add support for bytes in labels, by removing this str modifier? Since I really don't like how it looks. Guess removig str should give no effect, bu might break compatibility. I will find out how to make it possible without big compatibility problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. It is the result of str(label) conversion.

I suspect that the plain str(...) removal is not enough. Let me experiment first and come up with a PR for taskiq core project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I tried removing it and broke tests.

and pipeline_data.startswith("b'")
and pipeline_data.endswith("'")
):
pipeline_data2 = pipeline_data[2:-1].encode()
else:
pipeline_data2 = pipeline_data
parsed_data = self.broker.serializer.loadb(pipeline_data2)
try:
steps_data = pydantic.TypeAdapter(List[DumpedStep]).validate_json(
pipeline_data,
steps_data = pydantic.TypeAdapter(List[DumpedStep]).validate_python(
parsed_data,
)
except ValueError:
return
Expand Down
28 changes: 16 additions & 12 deletions taskiq_pipelines/pipeliner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import (
Any,
Coroutine,
Dict,
Generic,
List,
Literal,
Expand Down Expand Up @@ -29,10 +29,13 @@ class DumpedStep(pydantic.BaseModel):
"""Dumped state model."""

step_type: str
step_data: str
step_data: Dict[str, Any]
task_id: str


DumpedSteps = pydantic.RootModel[List[DumpedStep]]


class Pipeline(Generic[_FuncParams, _ReturnType]):
"""
Pipeline constructor.
Expand Down Expand Up @@ -116,7 +119,7 @@ def call_next(
task=task,
param_name=param_name,
**additional_kwargs,
).dumps(),
).model_dump(),
task_id="",
),
)
Expand Down Expand Up @@ -172,7 +175,7 @@ def call_after(
task=task,
param_name=EMPTY_PARAM_NAME,
**additional_kwargs,
).dumps(),
).model_dump(),
task_id="",
),
)
Expand Down Expand Up @@ -243,7 +246,7 @@ def map(
skip_errors=skip_errors,
check_interval=check_interval,
**additional_kwargs,
).dumps(),
).model_dump(),
task_id="",
),
)
Expand Down Expand Up @@ -315,24 +318,24 @@ def filter(
skip_errors=skip_errors,
check_interval=check_interval,
**additional_kwargs,
).dumps(),
).model_dump(),
task_id="",
),
)
return self

def dumps(self) -> str:
def dumpb(self) -> bytes:
"""
Dumps current pipeline as string.

:returns: serialized pipeline.
"""
return json.dumps(
[step.model_dump() for step in self.steps],
return self.broker.serializer.dumpb(
DumpedSteps.model_validate(self.steps).model_dump(),
)

@classmethod
def loads(cls, broker: AsyncBroker, pipe_data: str) -> "Pipeline[Any, Any]":
def loadb(cls, broker: AsyncBroker, pipe_data: bytes) -> "Pipeline[Any, Any]":
"""
Parses serialized pipeline.

Expand All @@ -344,7 +347,8 @@ def loads(cls, broker: AsyncBroker, pipe_data: str) -> "Pipeline[Any, Any]":
:return: new
"""
pipe: "Pipeline[Any, Any]" = Pipeline(broker)
pipe.steps = pydantic.TypeAdapter(List[DumpedStep]).validate_json(pipe_data)
data = broker.serializer.loadb(pipe_data)
pipe.steps = DumpedSteps.model_validate(data) # type: ignore[assignment]
return pipe

async def kiq(
Expand Down Expand Up @@ -383,7 +387,7 @@ async def kiq(
)
.with_task_id(step.task_id)
.with_labels(
**{CURRENT_STEP: 0, PIPELINE_DATA: self.dumps()}, # type: ignore
**{CURRENT_STEP: 0, PIPELINE_DATA: self.dumpb()}, # type: ignore
)
)
taskiq_task = await kicker.kiq(*args, **kwargs)
Expand Down
5 changes: 3 additions & 2 deletions taskiq_pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Package with default pipeline steps."""
from logging import getLogger
from typing import Any, Dict

from taskiq_pipelines.abc import AbstractStep
from taskiq_pipelines.steps.filter import FilterStep
Expand All @@ -9,12 +10,12 @@
logger = getLogger(__name__)


def parse_step(step_type: str, step_data: str) -> AbstractStep:
def parse_step(step_type: str, step_data: Dict[str, Any]) -> AbstractStep:
step_cls = AbstractStep._known_steps.get(step_type) # noqa: WPS437
if step_cls is None:
logger.warning(f"Unknown step type: {step_type}")
raise ValueError("Unknown step type.")
return step_cls.loads(step_data)
return step_cls(**step_data)


__all__ = [
Expand Down
18 changes: 0 additions & 18 deletions taskiq_pipelines/steps/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,6 @@ class FilterStep(pydantic.BaseModel, AbstractStep, step_name="filter"):
skip_errors: bool
check_interval: float

def dumps(self) -> str:
"""
Dumps step as string.

:return: returns json.
"""
return self.model_dump_json()

@classmethod
def loads(cls, data: str) -> "FilterStep":
"""
Parses mapper step from string.

:param data: dumped data.
:return: parsed step.
"""
return pydantic.TypeAdapter(FilterStep).validate_json(data)

async def act(
self,
broker: AsyncBroker,
Expand Down
18 changes: 0 additions & 18 deletions taskiq_pipelines/steps/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,6 @@ class MapperStep(pydantic.BaseModel, AbstractStep, step_name="mapper"):
skip_errors: bool
check_interval: float

def dumps(self) -> str:
"""
Dumps step as string.

:return: returns json.
"""
return self.model_dump_json()

@classmethod
def loads(cls, data: str) -> "MapperStep":
"""
Parses mapper step from string.

:param data: dumped data.
:return: parsed step.
"""
return pydantic.TypeAdapter(MapperStep).validate_json(data)

async def act(
self,
broker: AsyncBroker,
Expand Down
18 changes: 0 additions & 18 deletions taskiq_pipelines/steps/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,6 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
param_name: Union[Optional[int], str]
additional_kwargs: Dict[str, Any]

def dumps(self) -> str:
"""
Dumps step as string.

:return: returns json.
"""
return self.model_dump_json()

@classmethod
def loads(cls, data: str) -> "SequentialStep":
"""
Parses sequential step from string.

:param data: dumped data.
:return: parsed step.
"""
return pydantic.TypeAdapter(SequentialStep).validate_json(data)

async def act(
self,
broker: AsyncBroker,
Expand Down