Skip to content

Added intial group support. #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ repos:
args:
- "check"
- "--fix"
- "."
- "taskiq_pipelines"
- "tests"

- id: mypy
name: Validate types with MyPy
Expand Down
128 changes: 128 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,131 @@ If called tasks returned `True` for some element, this element will be added in

After the execution you'll get a list with filtered results.
You can add filters by calling `.filter` method of the pipeline.


### Group steps

This step groups together multiple tasks and sends them after the previous steps.
All tasks will be executed in parallel to each other and collected to a single tuple.

To create a group you need to use `Group` class from `taskiq_pipelines` like this:

```python
import asyncio
from typing import Any

from taskiq.brokers.inmemory_broker import InMemoryBroker

from taskiq_pipelines import Group, Pipeline, PipelineMiddleware

broker = InMemoryBroker()
broker.add_middlewares(PipelineMiddleware())


@broker.task
def add_one(value: int) -> int:
return value + 1


@broker.task
def mul_two(val: int) -> int:
return val * 2


@broker.task
def to_string(val: Any) -> str:
return str(val)


async def main():
pipe = (
Pipeline(broker)
.group(
Group(
# Aborts pipeline
# if any of tasks fails
skip_errors=False,
# How often to check for completion.
check_interval=0.1,
)
# Here we start task that adds 1 to 1
.add(add_one, 1)
# Here's a task that multiplies 2 by 2
.add(mul_two, 2)
# Here we map all results to string
).map(to_string)
)
task = await pipe.kiq()
result = await task.wait_result()
# Here it should output
# Calculated value: ['2', '4']
print("Calculated value:", result.return_value)


if __name__ == "__main__":
asyncio.run(main())

```

Alternatively, you can use `GroupWithArgs` class to pass arguments of previous step
as an argument to called functions. Here's an example:

```python
import asyncio
from typing import Any

from taskiq.brokers.inmemory_broker import InMemoryBroker

from taskiq_pipelines import GroupWithArgs, Pipeline, PipelineMiddleware

broker = InMemoryBroker()
broker.add_middlewares(PipelineMiddleware())


@broker.task
def add_one(value: int) -> int:
return value + 1


@broker.task
def mul_two(val: int) -> int:
return val * 2


@broker.task
def to_string(val: Any) -> str:
return str(val)


async def main():
pipe = (
# The pipelines starts with `add_one` task.
Pipeline(broker, add_one)
# All values are passed to the group
.group(
GroupWithArgs(
# Aborts pipeline
# if any of tasks fails
skip_errors=False,
# How often to check for completion.
check_interval=0.1,
)
# Here we start task that adds 1 to result
# of the previous task.
# The result is passed as keyword argument "value"
.add(add_one, param_name="value")
# Here's a task that multiplies 2
.add(mul_two)
# Here we map all results to string
).map(to_string)
)
task = await pipe.kiq(1)
result = await task.wait_result()
# Here it should output
# Calculated value: ['3', '4']
print("Calculated value:", result.return_value)


if __name__ == "__main__":
asyncio.run(main())
```
488 changes: 253 additions & 235 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions taskiq_pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
from taskiq_pipelines.middleware import PipelineMiddleware
from taskiq_pipelines.pipeliner import Pipeline
from taskiq_pipelines.task_group import Group, GroupWithArgs

__all__ = [
"AbortPipeline",
"Group",
"GroupWithArgs",
"Pipeline",
"PipelineError",
"PipelineMiddleware",
Expand Down
6 changes: 3 additions & 3 deletions taskiq_pipelines/abc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Type

from taskiq import AsyncBroker, TaskiqResult
from taskiq import AsyncBroker, AsyncTaskiqTask, TaskiqResult
from typing_extensions import ClassVar


Expand All @@ -26,9 +26,9 @@ async def act(
step_number: int,
parent_task_id: str,
task_id: str,
pipe_data: str,
pipe_data: bytes,
result: "TaskiqResult[Any]",
) -> None:
) -> AsyncTaskiqTask[Any]:
"""
Perform pipeline action.

Expand Down
1 change: 1 addition & 0 deletions taskiq_pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

CURRENT_STEP = "_pipe_current_step"
PIPELINE_DATA = "_pipe_data"
PARENT_TASK_ID = "_parent_task_id"

EMPTY_PARAM_NAME: Literal[-1] = -1
135 changes: 119 additions & 16 deletions taskiq_pipelines/pipeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
)

import pydantic
from taskiq import AsyncBroker, AsyncTaskiqTask
from taskiq import AsyncBroker, AsyncTaskiqTask, TaskiqResult
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.kicker import AsyncKicker
from typing_extensions import ParamSpec

from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
from taskiq_pipelines.steps import FilterStep, MapperStep, SequentialStep, parse_step
from taskiq_pipelines.steps.group import GroupStep
from taskiq_pipelines.task_group import Group, GroupWithArgs

_ReturnType = TypeVar("_ReturnType")
_FuncParams = ParamSpec("_FuncParams")
Expand Down Expand Up @@ -50,20 +52,44 @@ class Pipeline(Generic[_FuncParams, _ReturnType]):
but it's nice to have.
"""

@overload
def __init__(
self: "Pipeline[[], Any]",
broker: AsyncBroker,
task: None = None,
) -> None: ...

@overload
def __init__(
self,
broker: AsyncBroker,
task: Optional[
Union[
AsyncKicker[_FuncParams, _ReturnType],
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
]
] = None,
) -> None: ...

def __init__(
self,
broker: AsyncBroker,
task: Optional[
Union[
AsyncKicker[_FuncParams, _ReturnType],
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
Group[Any, _ReturnType],
]
] = None,
) -> None:
self.broker = broker
self.steps: "List[DumpedStep]" = []
if task:
self.call_next(task)
if not task:
return
if isinstance(task, Group):
self.group(task)
return
self.call_next(task)

@overload
def call_next(
Expand Down Expand Up @@ -325,6 +351,43 @@ def filter(
)
return self

@overload
def group(
self: "Pipeline[_FuncParams, _ReturnType]",
group: GroupWithArgs[Any, _T2],
) -> "Pipeline[_FuncParams, _T2]": ...

@overload
def group(
self: "Pipeline[_FuncParams, _ReturnType]",
group: Group[Any, _T2],
) -> "Pipeline[_FuncParams, _T2]": ...

def group(
self: "Pipeline[_FuncParams, _ReturnType]",
group: Union[Group[Any, Any], GroupWithArgs[Any, Any]],
) -> "Pipeline[_FuncParams, Any]":
"""
Add group task execution step.

This step will run all tasks in parallel
and will wait for all of them to finish.

Results of all tasks will be returned as an iterable
where each item is a result of the task in the group
with the same order.

:param group: group to execute.
"""
self.steps.append(
DumpedStep(
step_type=GroupStep._step_name,
step_data=group.to_step().model_dump(),
task_id="",
),
)
return self # type: ignore

def dumpb(self) -> bytes:
"""
Dumps current pipeline as string.
Expand Down Expand Up @@ -352,6 +415,48 @@ def loadb(cls, broker: AsyncBroker, pipe_data: bytes) -> "Pipeline[Any, Any]":
pipe.steps = DumpedSteps.model_validate(data) # type: ignore[assignment]
return pipe

async def _kick_sequential(
self,
step: SequentialStep,
task_id: str,
*args: Any,
**kwargs: Any,
) -> AsyncTaskiqTask[_ReturnType]:
kicker = (
AsyncKicker(
step.task_name,
broker=self.broker,
labels=step.labels,
)
.with_task_id(task_id)
.with_labels(
**{CURRENT_STEP: 0, PIPELINE_DATA: self.dumpb()}, # type: ignore
)
)
return await kicker.kiq(*args, **kwargs)

async def _kick_group(
self,
group: GroupStep,
task_id: str,
) -> AsyncTaskiqTask[Any]:
await group.act(
broker=self.broker,
task_id=task_id,
step_number=0,
parent_task_id="",
pipe_data=self.dumpb(),
result=TaskiqResult(
is_err=False,
return_value=None,
execution_time=0.0,
),
)
return AsyncTaskiqTask(
task_id=task_id,
result_backend=self.broker.result_backend,
)

async def kiq(
self,
*args: _FuncParams.args,
Expand All @@ -378,20 +483,18 @@ async def kiq(
self._update_task_ids()
step = self.steps[0]
parsed_step = parse_step(step.step_type, step.step_data)
if not isinstance(parsed_step, SequentialStep):
raise ValueError("First step must be sequential.")
kicker = (
AsyncKicker(
parsed_step.task_name,
broker=self.broker,
labels=parsed_step.labels,
if isinstance(parsed_step, SequentialStep):
taskiq_task = await self._kick_sequential(
parsed_step,
step.task_id,
*args,
**kwargs,
)
.with_task_id(step.task_id)
.with_labels(
**{CURRENT_STEP: 0, PIPELINE_DATA: self.dumpb()}, # type: ignore
)
)
taskiq_task = await kicker.kiq(*args, **kwargs)
elif isinstance(parsed_step, GroupStep):
taskiq_task = await self._kick_group(parsed_step, step.task_id)
else:
raise ValueError("First step must be sequential or a group.")

taskiq_task.task_id = self.steps[-1].task_id
return taskiq_task

Expand Down
2 changes: 2 additions & 0 deletions taskiq_pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from taskiq_pipelines.abc import AbstractStep
from taskiq_pipelines.steps.filter import FilterStep
from taskiq_pipelines.steps.group import GroupStep
from taskiq_pipelines.steps.mapper import MapperStep
from taskiq_pipelines.steps.sequential import SequentialStep

Expand All @@ -21,6 +22,7 @@ def parse_step(step_type: str, step_data: Dict[str, Any]) -> AbstractStep:

__all__ = [
"FilterStep",
"GroupStep",
"MapperStep",
"SequentialStep",
]
Loading