Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close #9: Support multiple concurrent worker processes #20

Merged
merged 4 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
source = src/
32 changes: 0 additions & 32 deletions .github/workflows/run-checks.yaml

This file was deleted.

28 changes: 22 additions & 6 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,38 @@
"program": "./src/aiotaskq/main.py",
"console": "integratedTerminal"
},
{
"name": "Sample App",
"type": "python",
"request": "launch",
"module": "aiotaskq.tests.apps.simple_app",
"args": [],
"console": "integratedTerminal"
},
{
"name": "Test",
"type": "python",
"module": "coverage",
"args": [
"run",
"-m",
"pytest",
"-vvv",
"-s",
],
"request": "launch",
"program": "./src/aiotaskq/tests/integration_test.py",
"console": "integratedTerminal"
},
{
"name": "Worker",
"name": "Sample Worker (Simple App)",
"type": "python",
"request": "launch",
"program": "./src/aiotaskq/worker.py",
"console": "integratedTerminal",
"module": "aiotaskq",
"args": [
"aiotaskq.main"
]
"worker",
"aiotaskq.tests.apps.simple_app"
],
"console": "integratedTerminal",
}
]
}
11 changes: 8 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
"**/.git": true,
"**/.venv": true,
"**/.hg": true,
"**/.DS_Store": true
}
}
"**/.DS_Store": true,
"**/*.egg-info": true,
"**/__pycache__": true,
"**/.mypy_cache": true
},
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false
}
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import asyncio
import aiotaskq


@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -130,25 +130,25 @@ Using `aiotaskq` we may end up with the following:
```python
import asyncio

from aiotaskq import register_task
from aiotaskq import task


@register_task
@task
def task_1(*args, **kwargs):
pass


@register_task
@task
def task_2(*args, **kwargs):
pass


@register_task
@task
def task_3(*args, **kwargs):
pass


@register_task
@task
def task_4(*args, **kwargs):
pass

Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ dependencies = [
"click==8.0.4"
]
name = "aiotaskq"
version = "0.0.4"
version = "0.0.5"
[project.optional-dependencies]
tests = [
"mypy==0.931",
"mypy-extensions==0.4.3",
"typing_extensions==4.1.1",
"black==22.1.0"
"black==22.1.0",
"pytest==7.1.2",
"pytest-asyncio==0.19.0"
]
[tool.black]
line-length = 100
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = aiotaskq
version = 0.0.4
version = 0.0.5
author = Imran Ariffin
author_email = ariffin.imran@gmail.com
description = A simple asynchronous task queue
Expand Down
6 changes: 3 additions & 3 deletions src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq
@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand All @@ -29,7 +29,7 @@ async def main():
"""

from .main import register_task
from .main import task


__all__ = ["register_task"]
__all__ = ["task"]
5 changes: 3 additions & 2 deletions src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import asyncio
import typing as t

import typer

Expand All @@ -10,9 +11,9 @@


@cli.command(name="worker")
def _worker_command(app: str):
def _worker_command(app: str, concurrency: t.Optional[int] = None):
loop = asyncio.get_event_loop()
loop.run_until_complete(worker(app_import_path=app))
loop.run_until_complete(worker(app_import_path=app, concurrency=concurrency))


@cli.command(name="metric")
Expand Down
11 changes: 11 additions & 0 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
"""
Define all exceptions that are possibly raised by the package.

Any raised thrown must be defined here.
"""


class WorkerNotReady(Exception):
"""Attempt to send task to worker but no worker is subscribing to tasks channel."""


class ModuleInvalidForTask(Exception):
"""Attempt to convert to task a function in an invalid module."""
92 changes: 66 additions & 26 deletions src/aiotaskq/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import asyncio
import inspect
import json
import logging
from types import ModuleType
import typing as t
import uuid

from aiotaskq.constants import REDIS_URL, RESULTS_CHANNEL_TEMPLATE, TASKS_CHANNEL
from aiotaskq.exceptions import WorkerNotReady

import aioredis

from aiotaskq.constants import REDIS_URL, RESULTS_CHANNEL_TEMPLATE, TASKS_CHANNEL
from aiotaskq.exceptions import ModuleInvalidForTask, WorkerNotReady

RT = t.TypeVar("RT")
P = t.ParamSpec("P")

Expand All @@ -29,9 +31,7 @@ async def get(self) -> RT:
async with redis_client.pubsub() as pubsub:
message: t.Optional[dict] = None
while message is None:
await pubsub.subscribe(
RESULTS_CHANNEL_TEMPLATE.format(task_id=self._task_id)
)
await pubsub.subscribe(RESULTS_CHANNEL_TEMPLATE.format(task_id=self._task_id))
message = await pubsub.get_message(ignore_subscribe_messages=True)
await asyncio.sleep(0.1)
logger.debug("Message: %s", message)
Expand All @@ -40,60 +40,100 @@ async def get(self) -> RT:


class Task(t.Generic[P, RT]):
"""
A callable can be applied asyncronously and executed on an aiotaskq worker process.

A task is essentially the same as any regular function, which can be
called synchronously, and thus be executed on the same process. It also can be
called asynchronously, and thus be executed on a worker process.

Example:
```python
def some_func(x: int, y: int) -> int:
return x + y

some_task = aiotaskq.task(some_func)

function_result = some_func(1, 2)
sync_task_result = some_task(1, 2)
async_task_result = some_task.apply_async(1, 2)

assert function_result == sync_task_result == async_task_result
```
"""

__qualname__: str

def __init__(self, f: t.Callable[P, RT]) -> None:
self._f = f
def __init__(self, func: t.Callable[P, RT]) -> None:
self._f = func
self._id = uuid.uuid4()

def __call__(self, *args, **kwargs) -> RT:
"""Call the task synchronously, by directly executing the underlying function."""
return self._f(*args, **kwargs)

def get_task_id(self) -> str:
"""Return the task id."""
return f"{self.__qualname__}:{self._id}"

async def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> RT:
task_id = self.get_task_id()
message = json.dumps(
"""
Call the task asyncronously, by executing the underlying function in a different process.

Execution is done by the following steps:
1. Serialize the task (just the task id and its arguments)
2. Publish it to a Tasks Channel, and wait for the results on a Results Channel
3. A worker process will pick up the taskand de-serialize it
4. The worker process find in its memory the task by the task id and execute it as a regular
function
5. The worker process will publish the result of the task to Results Channel
6. The main process (the caller) will pick up the result and return the result. DONE
"""
task_id: str = self.get_task_id()
message: str = json.dumps(
{
"task_id": task_id,
"args": args,
"kwargs": kwargs,
}
)
publisher = _get_redis_client()
publisher: aioredis.Redis = _get_redis_client()

is_worker_ready = (await publisher.pubsub_numsub(TASKS_CHANNEL))[0][1] > 0
num_subscribers_res: list[tuple[str, int]] = await publisher.pubsub_numsub(TASKS_CHANNEL)
is_worker_ready = num_subscribers_res[0][1] > 0
if not is_worker_ready:
raise WorkerNotReady(
"No worker is ready to pick up tasks. Have you run your workers?"
)
raise WorkerNotReady("No worker is ready to pick up tasks. Have you run your workers?")

await publisher.publish(TASKS_CHANNEL, message=message)
async_result: AsyncResult[RT] = AsyncResult(task_id=task_id)
result = await async_result.get()
result: RT = await async_result.get()
return result


def register_task(f: t.Callable[P, RT]) -> Task[P, RT]:
import inspect
def task(func: t.Callable[P, RT]) -> Task[P, RT]:
"""Decorator to convert a callable into an aiotaskq Task instance."""
func_module: t.Optional[ModuleType] = inspect.getmodule(func)

if func_module is None:
raise ModuleInvalidForTask(f"Function \"{func.__name__}\" is defined in an invalid module {func_module}")

func_module = inspect.getmodule(f)
module_path = ".".join(
[p.split(".py")[0] for p in func_module.__file__.strip("./").split("/") if p != "src"] # type: ignore
[
p.split(".py")[0]
for p in func_module.__file__.strip("./").split("/") # type: ignore
if p != "src"
]
)
task = Task[P, RT](f)
task.__qualname__ = f"{module_path}.{f.__name__}"
task = Task[P, RT](func)
task.__qualname__ = f"{module_path}.{func.__name__}"
task.__module__ = module_path
return task


_redis_client = None
_REDIS_CLIENT: t.Optional[aioredis.Redis] = None


def _get_redis_client() -> aioredis.Redis:
global _redis_client
if _redis_client is not None:
return _redis_client
if _REDIS_CLIENT is not None:
return _REDIS_CLIENT
return aioredis.from_url(REDIS_URL, max_connections=10, decode_responses=True)
Empty file.
Loading