Skip to content

Add empty param name support #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ ignore =
; Consider possible security implications associated with pickle module
; Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue
S403, S301
; Found too many imported names from a module
WPS235

per-file-ignores =
; all tests
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ We have a few steps available for chaining calls:
This type of step is just an ordinary call of the function.
If you haven't specified `param_name` argument, then the result
of the previous step will be passed as the first argument of the function.
Uf you did specify the `param_name` argument, then the result of the previous
If you did specify the `param_name` argument, then the result of the previous
step can be found in key word arguments with the param name you specified.

You can add sequential steps with `.call_next` method of the pipeline.

If you don't want to pass the result of the previous step to the next one,
you can use `.call_after` method of the pipeline.

### Mapper step

This step runs specified task for each item of the previous task's result spawning
Expand Down
4 changes: 4 additions & 0 deletions taskiq_pipelines/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
from typing import Literal

CURRENT_STEP = "_pipe_current_step"
PIPELINE_DATA = "_pipe_data"

EMPTY_PARAM_NAME: Literal[-1] = -1
79 changes: 73 additions & 6 deletions taskiq_pipelines/pipeliner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import json
from typing import Any, Coroutine, Generic, List, Optional, TypeVar, Union, overload
from typing import (
Any,
Coroutine,
Generic,
List,
Literal,
Optional,
TypeVar,
Union,
overload,
)

import pydantic
from taskiq import AsyncBroker, AsyncTaskiqTask
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.kicker import AsyncKicker
from typing_extensions import ParamSpec

from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
from taskiq_pipelines.steps import FilterStep, MapperStep, SequentialStep, parse_step

_ReturnType = TypeVar("_ReturnType")
Expand Down Expand Up @@ -58,7 +68,7 @@ def call_next(
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
],
param_name: Optional[str] = None,
param_name: Union[Optional[str], Literal[-1]] = None,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
Expand All @@ -70,7 +80,7 @@ def call_next(
AsyncKicker[Any, _T2],
AsyncTaskiqDecoratedTask[Any, _T2],
],
param_name: Optional[str] = None,
param_name: Union[Optional[str], Literal[-1]] = None,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
Expand All @@ -81,7 +91,7 @@ def call_next(
AsyncKicker[Any, Any],
AsyncTaskiqDecoratedTask[Any, Any],
],
param_name: Optional[str] = None,
param_name: Union[Optional[str], Literal[-1]] = None,
**additional_kwargs: Any,
) -> Any:
"""
Expand All @@ -94,7 +104,8 @@ def call_next(
if param_name is specified.

:param task: task to execute.
:param param_name: kwarg param name, defaults to None
:param param_name: kwarg param name, defaults to None.
If set to -1 (EMPTY_PARAM_NAME), result is not passed.
:param additional_kwargs: additional kwargs to task.
:return: updated pipeline.
"""
Expand All @@ -111,6 +122,62 @@ def call_next(
)
return self

@overload
def call_after(
self: "Pipeline[_FuncParams, _ReturnType]",
task: Union[
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
],
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...

@overload
def call_after(
self: "Pipeline[_FuncParams, _ReturnType]",
task: Union[
AsyncKicker[Any, _T2],
AsyncTaskiqDecoratedTask[Any, _T2],
],
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...

def call_after(
self,
task: Union[
AsyncKicker[Any, Any],
AsyncTaskiqDecoratedTask[Any, Any],
],
**additional_kwargs: Any,
) -> Any:
"""
Adds sequential step.

This task will be executed right after
the previous and result of the previous task
is not passed to the next task.

This is equivalent to call_next(task, param_name=-1).

:param task: task to execute.
:param additional_kwargs: additional kwargs to task.
:return: updated pipeline.
"""
self.steps.append(
DumpedStep(
step_type=SequentialStep.step_name,
step_data=SequentialStep.from_task(
task=task,
param_name=EMPTY_PARAM_NAME,
**additional_kwargs,
).dumps(),
task_id="",
),
)
return self

@overload
def map(
self: "Pipeline[_FuncParams, _ReturnType]",
Expand Down
28 changes: 24 additions & 4 deletions taskiq_pipelines/steps/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from taskiq.kicker import AsyncKicker

from taskiq_pipelines.abc import AbstractStep
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA


class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
Expand All @@ -19,9 +19,27 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):

task_name: str
labels: Dict[str, str]
param_name: Optional[str]
# order is important here, otherwise pydantic will always choose str.
# we use int instead of Literal[-1] because pydantic thinks that -1 is always str.
param_name: Union[Optional[int], str]
additional_kwargs: Dict[str, Any]

@pydantic.validator("param_name")
def validate_param_name(
self,
value: Union[Optional[str], int],
) -> Union[Optional[str], int]:
"""
Validate param_name.

:param value: value to validate.
:raises ValueError: if value is not str, None or -1 (EMPTY_PARAM_NAME).
:return: param value.
"""
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
return value

def dumps(self) -> str:
"""
Dumps step as string.
Expand Down Expand Up @@ -78,9 +96,11 @@ async def act(
**{PIPELINE_DATA: pipe_data, CURRENT_STEP: step_number}, # type: ignore
)
)
if self.param_name:
if isinstance(self.param_name, str):
self.additional_kwargs[self.param_name] = result.return_value
await kicker.kiq(**self.additional_kwargs)
elif self.param_name == EMPTY_PARAM_NAME:
await kicker.kiq(**self.additional_kwargs)
else:
await kicker.kiq(result.return_value, **self.additional_kwargs)

Expand All @@ -91,7 +111,7 @@ def from_task(
AsyncKicker[Any, Any],
AsyncTaskiqDecoratedTask[Any, Any],
],
param_name: Optional[str],
param_name: Union[Optional[str], int],
**additional_kwargs: Any,
) -> "SequentialStep":
"""
Expand Down