Skip to content

Dependencies were updated. #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
pytest:
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11"]
py_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v2
Expand Down
988 changes: 539 additions & 449 deletions poetry.lock

Large diffs are not rendered by default.

23 changes: 11 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ classifiers = [
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Operating System :: OS Independent",
"Intended Audience :: Developers",
"Topic :: System :: Networking",
Expand All @@ -25,20 +26,20 @@ homepage = "https://github.com/taskiq-python/taskiq-pipelines"
keywords = ["taskiq", "pipelines", "tasks", "distributed", "async"]

[tool.poetry.dependencies]
python = "^3.8.1"
taskiq = ">=0.11.0, <1"
python = "^3.9"
taskiq = ">=0.11.12, <1"
typing-extensions = "^4.3.0"
pydantic = "^2"

[tool.poetry.group.dev.dependencies]
pytest = "^7"
black = { version = "^22.6.0", allow-prereleases = true }
pytest-cov = "^3.0.0"
anyio = "^3.6.1"
pre-commit = "^2.20.0"
pytest = "^8"
black = { version = "^25", allow-prereleases = true }
pytest-cov = "^6"
anyio = "^4"
pre-commit = "^4"
mypy = "^1"
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
ruff = "^0.5.6"
pytest-xdist = { version = "^3", extras = ["psutil"] }
ruff = "^0.9.9"

[tool.mypy]
strict = true
Expand Down Expand Up @@ -97,8 +98,6 @@ lint.ignore = [
"D401", # First line should be in imperative mood
"D104", # Missing docstring in public package
"D100", # Missing docstring in public module
"ANN102", # Missing type annotation for self in method
"ANN101", # Missing type annotation for argument
"ANN401", # typing.Any are disallowed in `**kwargs
"PLR0913", # Too many arguments for function call
"D106", # Missing docstring in public nested class
Expand Down
3 changes: 2 additions & 1 deletion taskiq_pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Pipelines for taskiq tasks."""

from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
from taskiq_pipelines.middleware import PipelineMiddleware
from taskiq_pipelines.pipeliner import Pipeline

__all__ = [
"AbortPipeline",
"Pipeline",
"PipelineError",
"AbortPipeline",
"PipelineMiddleware",
]
30 changes: 30 additions & 0 deletions taskiq_pipelines/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
from typing import ClassVar, Union

from taskiq import TaskiqError


class PipelineError(TaskiqError):
"""Generic pipeline error."""


class StepError(PipelineError):
"""Error found while mapping step."""

__template__ = (
"Task {task_id} returned an error. {_STEP_NAME} failed. Reason: {error}"
)
_STEP_NAME: ClassVar[str]

task_id: str
error: Union[BaseException, None]


class MappingError(StepError):
"""Error found while mapping step."""

_STEP_NAME = "mapping"


class FilterError(StepError):
"""Error found while filtering step."""

_STEP_NAME = "filtering"


class AbortPipeline(PipelineError): # noqa: N818
"""
Abort curret pipeline execution.
Expand All @@ -15,3 +41,7 @@ class AbortPipeline(PipelineError): # noqa: N818
It imediately aborts current pipeline
execution.
"""

__template__ = "Pipeline was aborted. {reason}"

reason: str = "No reason provided."
8 changes: 4 additions & 4 deletions taskiq_pipelines/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def post_save( # noqa: PLR0911
return
current_step_num = int(message.labels[CURRENT_STEP])
if PIPELINE_DATA not in message.labels:
logger.warn("Pipline data not found. Execution flow is broken.")
logger.warning("Pipline data not found. Execution flow is broken.")
return
pipeline_data = message.labels[PIPELINE_DATA]
parsed_data = self.broker.serializer.loadb(pipeline_data)
Expand All @@ -47,7 +47,7 @@ async def post_save( # noqa: PLR0911
parsed_data,
)
except ValueError as err:
logger.warn("Cannot parse pipline_data: %s", err, exc_info=True)
logger.warning("Cannot parse pipline_data: %s", err, exc_info=True)
return
if current_step_num + 1 >= len(steps_data):
logger.debug("Pipeline is completed.")
Expand Down Expand Up @@ -99,7 +99,7 @@ async def on_error(
return
current_step_num = int(message.labels[CURRENT_STEP])
if PIPELINE_DATA not in message.labels:
logger.warn("Pipline data not found. Execution flow is broken.")
logger.warning("Pipline data not found. Execution flow is broken.")
return
pipe_data = message.labels[PIPELINE_DATA]
try:
Expand Down Expand Up @@ -129,7 +129,7 @@ async def fail_pipeline(
TaskiqResult(
is_err=True,
return_value=None, # type: ignore
error=abort or AbortPipeline("Execution aborted."),
error=abort or AbortPipeline(reason="Execution aborted."),
execution_time=0,
log="Error found while executing pipeline.",
),
Expand Down
24 changes: 8 additions & 16 deletions taskiq_pipelines/pipeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ def call_next(
],
param_name: Union[Optional[str], Literal[-1]] = None,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
) -> "Pipeline[_FuncParams, _T2]": ...

@overload
def call_next(
Expand All @@ -85,8 +84,7 @@ def call_next(
],
param_name: Union[Optional[str], Literal[-1]] = None,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
) -> "Pipeline[_FuncParams, _T2]": ...

def call_next(
self,
Expand Down Expand Up @@ -133,8 +131,7 @@ def call_after(
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
],
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
) -> "Pipeline[_FuncParams, _T2]": ...

@overload
def call_after(
Expand All @@ -144,8 +141,7 @@ def call_after(
AsyncTaskiqDecoratedTask[Any, _T2],
],
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _T2]":
...
) -> "Pipeline[_FuncParams, _T2]": ...

def call_after(
self,
Expand Down Expand Up @@ -192,8 +188,7 @@ def map(
skip_errors: bool = False,
check_interval: float = 0.5,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, List[_T2]]":
...
) -> "Pipeline[_FuncParams, List[_T2]]": ...

@overload
def map(
Expand All @@ -206,8 +201,7 @@ def map(
skip_errors: bool = False,
check_interval: float = 0.5,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, List[_T2]]":
...
) -> "Pipeline[_FuncParams, List[_T2]]": ...

def map(
self,
Expand Down Expand Up @@ -263,8 +257,7 @@ def filter(
skip_errors: bool = False,
check_interval: float = 0.5,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _ReturnType]":
...
) -> "Pipeline[_FuncParams, _ReturnType]": ...

@overload
def filter(
Expand All @@ -277,8 +270,7 @@ def filter(
skip_errors: bool = False,
check_interval: float = 0.5,
**additional_kwargs: Any,
) -> "Pipeline[_FuncParams, _ReturnType]":
...
) -> "Pipeline[_FuncParams, _ReturnType]": ...

def filter(
self,
Expand Down
3 changes: 2 additions & 1 deletion taskiq_pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Package with default pipeline steps."""

from logging import getLogger
from typing import Any, Dict

Expand All @@ -19,7 +20,7 @@ def parse_step(step_type: str, step_data: Dict[str, Any]) -> AbstractStep:


__all__ = [
"FilterStep",
"MapperStep",
"SequentialStep",
"FilterStep",
]
14 changes: 9 additions & 5 deletions taskiq_pipelines/steps/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from typing import Any, Dict, Iterable, List, Optional, Union

import pydantic
from taskiq import AsyncBroker, Context, TaskiqDepends, TaskiqError, TaskiqResult
from taskiq import AsyncBroker, Context, TaskiqDepends, TaskiqResult
from taskiq.brokers.shared_broker import async_shared_broker
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.kicker import AsyncKicker

from taskiq_pipelines.abc import AbstractStep
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
from taskiq_pipelines.exceptions import AbortPipeline
from taskiq_pipelines.exceptions import AbortPipeline, FilterError


@async_shared_broker.task(task_name="taskiq_pipelines.shared.filter_tasks")
async def filter_tasks(
async def filter_tasks( # noqa: C901
task_ids: List[str],
parent_task_id: str,
check_interval: float,
Expand Down Expand Up @@ -62,7 +62,11 @@ async def filter_tasks(
if result.is_err:
if skip_errors:
continue
raise TaskiqError(f"Task {task_id} returned error. Filtering failed.")
err_cause = None
if isinstance(result.error, BaseException):
err_cause = result.error
raise FilterError(task_id=task_id, error=result.error) from err_cause

if result.return_value:
filtered_results.append(value)
return filtered_results
Expand Down Expand Up @@ -103,7 +107,7 @@ async def act(
:raises AbortPipeline: if result is not iterable.
"""
if not isinstance(result.return_value, Iterable):
raise AbortPipeline("Result of the previous task is not iterable.")
raise AbortPipeline(reason="Result of the previous task is not iterable.")
sub_task_ids = []
for item in result.return_value:
kicker: "AsyncKicker[Any, Any]" = AsyncKicker(
Expand Down
33 changes: 21 additions & 12 deletions taskiq_pipelines/steps/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
AsyncTaskiqDecoratedTask,
Context,
TaskiqDepends,
TaskiqError,
TaskiqResult,
async_shared_broker,
)
from taskiq.kicker import AsyncKicker

from taskiq_pipelines.abc import AbstractStep
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
from taskiq_pipelines.exceptions import AbortPipeline
from taskiq_pipelines.exceptions import AbortPipeline, MappingError


@async_shared_broker.task(task_name="taskiq_pipelines.shared.wait_tasks")
Expand Down Expand Up @@ -60,7 +59,11 @@ async def wait_tasks(
if result.is_err:
if skip_errors:
continue
raise TaskiqError(f"Task {task_id} returned error. Mapping failed.")
err_cause = None
if isinstance(result.error, BaseException):
err_cause = result.error
raise MappingError(task_id=task_id, error=result.error) from err_cause

results.append(result.return_value)
return results

Expand Down Expand Up @@ -106,7 +109,7 @@ async def act(
sub_task_ids: List[str] = []
return_value = result.return_value
if not isinstance(return_value, Iterable):
raise AbortPipeline("Result of the previous task is not iterable.")
raise AbortPipeline(reason="Result of the previous task is not iterable.")

for item in return_value:
kicker: "AsyncKicker[Any, Any]" = AsyncKicker(
Expand All @@ -121,14 +124,20 @@ async def act(
task = await kicker.kiq(item, **self.additional_kwargs)
sub_task_ids.append(task.task_id)

await wait_tasks.kicker().with_task_id(task_id).with_broker(
broker,
).with_labels(
**{CURRENT_STEP: step_number, PIPELINE_DATA: pipe_data}, # type: ignore
).kiq(
sub_task_ids,
check_interval=self.check_interval,
skip_errors=self.skip_errors,
await (
wait_tasks.kicker()
.with_task_id(task_id)
.with_broker(
broker,
)
.with_labels(
**{CURRENT_STEP: step_number, PIPELINE_DATA: pipe_data}, # type: ignore
)
.kiq(
sub_task_ids,
check_interval=self.check_interval,
skip_errors=self.skip_errors,
)
)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions tests/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def normal_task(i: bool) -> bool:
@broker.task
def aborting_task(i: int) -> bool:
if i:
raise AbortPipeline(text)
raise AbortPipeline(reason=text)
return True

pipe = Pipeline(broker, aborting_task).call_next(normal_task)
Expand All @@ -70,4 +70,4 @@ def aborting_task(i: int) -> bool:
res = await sent.wait_result()
assert res.is_err is True
assert res.return_value is None
assert res.error.args[0] == text
assert text in res.error.args[0]