Skip to content

Added tests. #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ per-file-ignores =
DAR101,
; Found too many arguments
WPS211,
; Found nested function
WPS430,
; Found too short name
WPS111,

; all init files
__init__.py:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ repos:
language: system
pass_filenames: false
types: [python]
args: [--count, taskiq_pipelines]
args: [--count, taskiq_pipelines, tests]

- id: mypy
name: Validate types with MyPy
Expand Down
8 changes: 4 additions & 4 deletions taskiq_pipelines/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
class AbstractStep(ABC):
"""Abstract pipeline step."""

step_name: str
known_steps: "Dict[str, Type[AbstractStep]]" = {}
_step_name: str
_known_steps: "Dict[str, Type[AbstractStep]]" = {}

def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
# Sets step name to the step.
cls.step_name = step_name
cls._step_name = step_name
# Registers new subclass in the dict of
# known steps.
cls.known_steps[step_name] = cls
cls._known_steps[step_name] = cls

@abstractmethod
def dumps(self) -> str:
Expand Down
8 changes: 4 additions & 4 deletions taskiq_pipelines/pipeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def call_next(
"""
self.steps.append(
DumpedStep(
step_type=SequentialStep.step_name,
step_type=SequentialStep._step_name, # noqa: WPS437
step_data=SequentialStep.from_task(
task=task,
param_name=param_name,
Expand Down Expand Up @@ -167,7 +167,7 @@ def call_after(
"""
self.steps.append(
DumpedStep(
step_type=SequentialStep.step_name,
step_type=SequentialStep._step_name, # noqa: WPS437
step_data=SequentialStep.from_task(
task=task,
param_name=EMPTY_PARAM_NAME,
Expand Down Expand Up @@ -236,7 +236,7 @@ def map(
"""
self.steps.append(
DumpedStep(
step_type=MapperStep.step_name,
step_type=MapperStep._step_name, # noqa: WPS437
step_data=MapperStep.from_task(
task=task,
param_name=param_name,
Expand Down Expand Up @@ -308,7 +308,7 @@ def filter(
"""
self.steps.append(
DumpedStep(
step_type=FilterStep.step_name,
step_type=FilterStep._step_name, # noqa: WPS437
step_data=FilterStep.from_task(
task=task,
param_name=param_name,
Expand Down
2 changes: 1 addition & 1 deletion taskiq_pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def parse_step(step_type: str, step_data: str) -> AbstractStep:
step_cls = AbstractStep.known_steps.get(step_type)
step_cls = AbstractStep._known_steps.get(step_type) # noqa: WPS437
if step_cls is None:
logger.warning(f"Unknown step type: {step_type}")
raise ValueError("Unknown step type.")
Expand Down
16 changes: 0 additions & 16 deletions taskiq_pipelines/steps/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,6 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
param_name: Union[Optional[int], str]
additional_kwargs: Dict[str, Any]

@pydantic.validator("param_name")
def validate_param_name(
self,
value: Union[Optional[str], int],
) -> Union[Optional[str], int]:
"""
Validate param_name.

:param value: value to validate.
:raises ValueError: if value is not str, None or -1 (EMPTY_PARAM_NAME).
:return: param value.
"""
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
return value

def dumps(self) -> str:
"""
Dumps step as string.
Expand Down
3 changes: 0 additions & 3 deletions taskiq_pipelines/tests/test_stub.py

This file was deleted.

12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest


@pytest.fixture(scope="session")
def anyio_backend() -> str:
"""
Anyio backend.

Backend for anyio pytest plugin.
:return: backend name.
"""
return "asyncio"
44 changes: 44 additions & 0 deletions tests/test_steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List

import pytest
from taskiq import InMemoryBroker

from taskiq_pipelines import Pipeline, PipelineMiddleware


@pytest.mark.anyio
async def test_success() -> None:
"""Tests that sequential step works as expected."""
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())

@broker.task
def add(i: int) -> int:
return i + 1

@broker.task
def double(i: int) -> int:
return i * 2

pipe = Pipeline(broker, add).call_next(double)
sent = await pipe.kiq(1)
res = await sent.wait_result()
assert res.return_value == 4


@pytest.mark.anyio
async def test_mapping_success() -> None:
"""Test that map step works as expected."""
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())

@broker.task
def ranger(i: int) -> List[int]:
return list(range(i))

@broker.task
def double(i: int) -> int:
return i * 2

pipe = Pipeline(broker, ranger).map(double)
sent = await pipe.kiq(4)
res = await sent.wait_result()
assert res.return_value == list(map(double, ranger(4)))