Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/available-components/brokers.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ In this section we'll list officially supported brokers.

This is a special broker for local development. It uses the same functions to execute tasks,
but all tasks are executed locally in the current thread.
By default it uses `InMemoryResultBackend` but this can be overriden.
By default it uses `InMemoryResultBackend` but this can be overridden.

## ZeroMQBroker

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/extending/result_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def is_result_ready(
Check if result exists.

This function must check whether result
is available in your resul backend
is available in your result backend
without fetching the result.

:param task_id: id of a task.
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ test_project
You can specify all tasks modules to import manually.

```bash
taskiq test_project.broker:broker test_projec.submodule.tasks test_projec.utils.tasks
taskiq worker test_project.broker:broker test_project.submodule.tasks test_project.utils.tasks
```

Or you can let taskiq find all python modules named tasks in current directory recursively.

```bash
taskiq test_project.broker:broker -fsd
taskiq worker test_project.broker:broker -fsd
```

If you have uvloop installed, taskiq will automatically install new policies to event loop.
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Of course we can implement loop like this:
await asyncio.sleep(timedelta(minutes=5).total_seconds)
```

But if you have many schedules it may be a little painful to implement. So let me introuce you the `TaskiqScheduler`.
But if you have many schedules it may be a little painful to implement. So let me introduce you the `TaskiqScheduler`.
Let's add scheduler to our module.

@[code python](../examples/schedule/intro.py)
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/state-and-deps.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ For example:

@[code python](../examples/state/dependencies_tree.py)

In this code, the dependency `common_dep` is going to be evaluated only once and the `dep1` and the `dep2` are going to recevie the same value. You can control this behavior by using the `use_cache=False` parameter to you dependency. This parameter will force the
In this code, the dependency `common_dep` is going to be evaluated only once and the `dep1` and the `dep2` are going to receive the same value. You can control this behavior by using the `use_cache=False` parameter to you dependency. This parameter will force the
dependency to reevaluate all it's subdependencies.

In this example we cannot predict the result. Since the `dep2` doesn't use cache for the `common_dep` function.
Expand Down Expand Up @@ -167,7 +167,7 @@ If you want to do something asynchronously, convert this function to an asynchro

### Default dependencies

By default taskiq has only two deendencies:
By default taskiq has only two dependencies:

- Context from `taskiq.context.Context`
- TaskiqState from `taskiq.state.TaskiqState`
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/taskiq-with-fastapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FastAPI is one of the most popular async web frameworks in python. It has gained
1. It's easy to use;
2. It has a cool dependency injection.

In taskiq we try to make our libraries easy to use and we too have a depenndency injection. But our dependencies
In taskiq we try to make our libraries easy to use and we too have a dependency injection. But our dependencies
are not compatible with FastAPI's dependencies by default. That is why we have created a library "[taskiq-fastapi](https://pypi.org/project/taskiq-fastapi/)" to make integration with
FastAPI as smooth as possible.

Expand Down
4 changes: 2 additions & 2 deletions docs/guide/testing-taskiq.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ order: 9

# Testing with taskiq

Everytime we write programs, we want them to be correct. To achieve this, we use tests.
Every time we write programs, we want them to be correct. To achieve this, we use tests.
Taskiq allows you to write tests easily as if tasks were normal functions.

Let's dive into examples.
Expand Down Expand Up @@ -76,7 +76,7 @@ After the preparations are done, we need to modify the broker's file in your pro

As you can see, we added an `if` statement. If the expression is true, we replace our broker with an imemory broker.
The main point here is to not have an actual connection during testing. It's useful because inmemory broker has
the same interface as a real broker, but it doesn't send tasks acutally.
the same interface as a real broker, but it doesn't send tasks actually.

## Testing tasks

Expand Down
4 changes: 2 additions & 2 deletions taskiq/abc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def add_dependency_context(self, new_ctx: Dict[Any, Any]) -> None:
Provided dict will be used to inject new dependencies
in all dependency graph contexts.

:param new_ctx: Additional context values for dependnecy injection.
:param new_ctx: Additional context values for dependency injection.
"""
self.custom_dependency_context.update(new_ctx)

Expand Down Expand Up @@ -152,7 +152,7 @@ async def shutdown(self) -> None:
Close the broker.

This method is called,
when broker is closig.
when broker is closing.
"""
event = TaskiqEvents.CLIENT_SHUTDOWN
if self.is_worker_process:
Expand Down
2 changes: 1 addition & 1 deletion taskiq/abc/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def pre_execute(
"""
This hook is called before executing task.

This is a worker-side hook, wich means it
This is a worker-side hook, which means it
executes in the worker process.

:param message: incoming parsed taskiq message.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/abc/result_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> N
"""
Saves result to the result backend.

Result must be save so it can be accesed later
Result must be save so it can be accessed later
by the calling side of the system.

:param task_id: id of a task to save.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def kick(self, message: BrokerMessage) -> None:

This method just executes given task.

:param message: incomming message.
:param message: incoming message.

:raises TaskiqError: if someone wants to kick unknown task.
"""
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/scheduler/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
help=(
"If this option is on, "
"taskiq will try to find tasks modules "
"in current directory recursievly. Name of file to search for "
"in current directory recursively. Name of file to search for "
"can be configured using `--tasks-pattern` option."
),
)
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def schedules_updater(
"""
Periodic update to schedules.

This task preiodicaly checks for new schedules,
This task periodically checks for new schedules,
assembles the final list and replaces current
schedule with a new one.

Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class FileWatcher: # pragma: no cover
"""Filewatcher that watchs for filesystem changes."""
"""Filewatcher that watches for filesystem changes."""

def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions taskiq/cli/worker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def from_cli( # noqa: WPS213
help=(
"If this option is on, "
"taskiq will try to find tasks modules "
"in current directory recursievly. Name of file to search for "
"in current directory recursively. Name of file to search for "
"can be configured using `--tasks-pattern` option."
),
)
Expand All @@ -127,7 +127,7 @@ def from_cli( # noqa: WPS213
"[%(module)s:%(funcName)s:%(lineno)d] "
"%(message)s"
),
help="Format wich is used when collecting logs from function execution",
help="Format which is used when collecting logs from function execution",
)
parser.add_argument(
"--no-parse",
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/worker/log_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, *streams: IO[Any]) -> None:

def write(self, message: Any) -> None:
"""
This write request writes to all avaialble streams.
This write request writes to all available streams.

:param message: message to write.
"""
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def get_signal_handler(
action_queue: "Queue[ProcessActionBase]",
) -> Callable[[int, Any], None]:
"""
Generate singnal handler for main process.
Generate signal handler for main process.

The signal handler will just put the SHUTDOWN event in
the action queue.
Expand Down
10 changes: 5 additions & 5 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None:
"""
This function used to shutdown broker.

Broker can throw erorrs during shutdown,
Broker can throw errors during shutdown,
or it may return some value.

We need to handle such situations.

:param broker: current broker.
:param timeout: maximum amout of time to shutdown the broker.
:param timeout: maximum amount of time to shutdown the broker.
"""
logger.warning("Shutting down the broker.")
try:
Expand Down Expand Up @@ -70,7 +70,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
This function starts actual listening process.

It imports broker and all tasks.
Since tasks registers themselfs in a global set,
Since tasks registers themselves in a global set,
it's easy to just import module where you have decorated
function and they will be available in broker's `available_tasks`
field.
Expand Down Expand Up @@ -110,7 +110,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:

:param signum: received signal number.
:param _frame: current execution frame.
:raises KeyboardInterrupt: if termiation hasn't begun.
:raises KeyboardInterrupt: if termination hasn't begun.
"""
logger.debug(f"Got signal {signum}.")
nonlocal shutting_down # noqa: WPS420
Expand Down Expand Up @@ -169,7 +169,7 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
observer.start()
args.workers = 1
logging.warning(
"Reload on chage enabled. Number of worker processes set to 1.",
"Reload on change enabled. Number of worker processes set to 1.",
)

manager = ProcessManager(args=args, observer=observer, worker_function=start_listen)
Expand Down
6 changes: 3 additions & 3 deletions taskiq/decor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
self.original_func = original_func
self.labels = labels

# Docs for this method are ommited in order to help
# Docs for this method are omitted in order to help
# your IDE resolve correct docs for it.
def __call__( # noqa: D102
self,
Expand Down Expand Up @@ -97,8 +97,8 @@ def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
"""
This function returns kicker object.

Kicker is a object that can modyfy kiq request
before sendig it.
Kicker is a object that can modify kiq request
before sending it.

:return: AsyncKicker instance.
"""
Expand Down
2 changes: 1 addition & 1 deletion taskiq/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TaskiqEvents(enum.Enum):

# Worker events.

# Called on woker startup.
# Called on worker startup.
WORKER_STARTUP = "WORKER_STARTUP"
# Called o worker shutdown.
WORKER_SHUTDOWN = "WORKER_SHUTDOWN"
Expand Down
2 changes: 1 addition & 1 deletion taskiq/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class TaskiqMessage(BaseModel):

This an internal class used
by brokers. Every remote call
recieve such messages.
receive such messages.
"""

task_id: str
Expand Down
2 changes: 1 addition & 1 deletion taskiq/receiver/params_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def parse_params( # noqa: C901
>>> def my_task(a: int) -> str
>>> ...

If you will kall my_task.kiq("11")
If you will call my_task.kiq("11")

You'll receive parsed 11 (int).
But, if you call it with mytask.kiq("str"),
Expand Down
2 changes: 1 addition & 1 deletion taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def callback( # noqa: C901, WPS213
that came from brokers.

:raises Exception: if raise_err is true,
and excpetion were found while saving result.
and exception were found while saving result.
:param message: received message.
:param raise_err: raise an error if cannot save result in
result_backend.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
tasks available to the broker.

If task has a schedule label,
it will be parsed and retuned.
it will be parsed and returned.

:return: list of schedules.
"""
Expand Down
6 changes: 3 additions & 3 deletions taskiq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def wait_result(
"""
Wait for result to become ready and get it.

This function constantly checks wheter result is ready
This function constantly checks whether result is ready
and fetches it when it becomes available.

:param check_interval: how ofen availability is checked.
:param check_interval: how often availability is checked.
:param timeout: maximum amount of time it will wait
before raising TaskiqResultTimeoutError.
:param with_logs: whether you need to download logs.
Expand All @@ -81,7 +81,7 @@ async def is_ready(self) -> bool:
"""
Checks if task is completed.

:raises ResultIsReadyError: if we can't get info about task readyness.
:raises ResultIsReadyError: if we can't get info about task readiness.

:return: True if task is completed.
"""
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/worker/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def test_func(param: int) -> int:


@pytest.mark.anyio
async def test_run_task_successfull_sync() -> None:
async def test_run_task_successful_sync() -> None:
"""Tests that run_task can run sync tasks."""

def test_func(param: int) -> int:
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_func() -> None:

@pytest.mark.anyio
async def test_callback_success() -> None:
"""Test that callback funcion works well."""
"""Test that callback function works well."""
broker = InMemoryBroker()
called_times = 0

Expand Down Expand Up @@ -252,7 +252,7 @@ def test_func(tes_val: MyTestClass = Depends()) -> int:

@pytest.mark.anyio
async def test_callback_semaphore() -> None:
"""Test that callback funcion semaphore works well."""
"""Test that callback function semaphore works well."""
max_async_tasks = 3
broker = BrokerForTests()
sem_num = 0
Expand Down
2 changes: 1 addition & 1 deletion tests/middlewares/test_simple_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def broker() -> AsyncMock:


@pytest.mark.anyio
async def test_successfull_retry(broker: AsyncMock) -> None:
async def test_successful_retry(broker: AsyncMock) -> None:
middleware = SimpleRetryMiddleware()
middleware.set_broker(broker)
await middleware.on_error(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@pytest.mark.anyio
async def test_gather() -> None:
"""Test successfull task gathering."""
"""Test successful task gathering."""
rb_mock = AsyncMock()
rb_mock.is_result_ready.return_value = True
rb_mock.get_result.return_value = 1
Expand Down