Skip to content

Commit

Permalink
Close #9: Support multiple concurrent worker processes (#20)
Browse files Browse the repository at this point in the history
Features:
* Accept --concurrency N option and spawn N workers
* Wait for child workers ready before propagate msg
* Ensure worker exit early with message when app path invalid
* Make parallel requests when checking for ready workers

Dev features & tech debts:
* Ensure run some tests asynchronously
* Remove unnecessary GithubAction workflow
* Add docstrings to some modules, classes and functions
* Cleanup using pylint and black (Will add pylint to CI in the future)
* Allow specify test pattern on ./test.sh

Notes:
Currently we see this warning when running the tests:
```
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Connection.disconnect()\
 done, defined at /home/in-gote/workspace/aiotaskq/.venv/lib/\
python3.10/site-packages/aioredis/connection.py:794> wait_for=<\
Future pending cb=[Task.task_wakeup()]>>
```

We should fix it in the future.

Also, currently because in some tests (e.g.
`test_integration.test_sync_and_async_parity__simple_app`) we're
starting the worker in a sub-process, `coverage` doesn't count
anything in worker.py. I tried following this guide: https://
coverage.readthedocs.io/en/6.4.4/api_module.html#coverage.process
_startup, but I couldn't get it to work. Maybe I missed something.

We also need to fix this in the future. We should consider either:

1. Start the worker using `multiprocessing` instead of `subprocess`
2. Somehow follow the guide correctly
  • Loading branch information
imranariffin authored Aug 29, 2022
1 parent 84c18b7 commit 136ae39
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 171 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
source = src/
32 changes: 0 additions & 32 deletions .github/workflows/run-checks.yaml

This file was deleted.

28 changes: 22 additions & 6 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,38 @@
"program": "./src/aiotaskq/main.py",
"console": "integratedTerminal"
},
{
"name": "Sample App",
"type": "python",
"request": "launch",
"module": "aiotaskq.tests.apps.simple_app",
"args": [],
"console": "integratedTerminal"
},
{
"name": "Test",
"type": "python",
"module": "coverage",
"args": [
"run",
"-m",
"pytest",
"-vvv",
"-s",
],
"request": "launch",
"program": "./src/aiotaskq/tests/integration_test.py",
"console": "integratedTerminal"
},
{
"name": "Worker",
"name": "Sample Worker (Simple App)",
"type": "python",
"request": "launch",
"program": "./src/aiotaskq/worker.py",
"console": "integratedTerminal",
"module": "aiotaskq",
"args": [
"aiotaskq.main"
]
"worker",
"aiotaskq.tests.apps.simple_app"
],
"console": "integratedTerminal",
}
]
}
11 changes: 8 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
"**/.git": true,
"**/.venv": true,
"**/.hg": true,
"**/.DS_Store": true
}
}
"**/.DS_Store": true,
"**/*.egg-info": true,
"**/__pycache__": true,
"**/.mypy_cache": true
},
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false
}
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import asyncio
import aiotaskq


@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -130,25 +130,25 @@ Using `aiotaskq` we may end up with the following:
```python
import asyncio

from aiotaskq import register_task
from aiotaskq import task


@register_task
@task
def task_1(*args, **kwargs):
pass


@register_task
@task
def task_2(*args, **kwargs):
pass


@register_task
@task
def task_3(*args, **kwargs):
pass


@register_task
@task
def task_4(*args, **kwargs):
pass

Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ dependencies = [
"click==8.0.4"
]
name = "aiotaskq"
version = "0.0.4"
version = "0.0.5"
[project.optional-dependencies]
tests = [
"mypy==0.931",
"mypy-extensions==0.4.3",
"typing_extensions==4.1.1",
"black==22.1.0"
"black==22.1.0",
"pytest==7.1.2",
"pytest-asyncio==0.19.0"
]
[tool.black]
line-length = 100
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = aiotaskq
version = 0.0.4
version = 0.0.5
author = Imran Ariffin
author_email = ariffin.imran@gmail.com
description = A simple asynchronous task queue
Expand Down
6 changes: 3 additions & 3 deletions src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq
@aiotaskq.register_task
@aiotaskq.task
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand All @@ -29,7 +29,7 @@ async def main():
"""

from .main import register_task
from .main import task


__all__ = ["register_task"]
__all__ = ["task"]
13 changes: 9 additions & 4 deletions src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
"""Module to define the main commands for the cli."""

#!/usr/bin/env python

import asyncio
import typing as t

import typer

from aiotaskq.worker import worker
from aiotaskq.worker import Defaults, worker

cli = typer.Typer()


@cli.command(name="worker")
def _worker_command(app: str):
def worker_command(app: str, concurrency: t.Optional[int] = Defaults.concurrency):
"""Command to start workers."""
loop = asyncio.get_event_loop()
loop.run_until_complete(worker(app_import_path=app))
loop.run_until_complete(worker(app_import_path=app, concurrency=concurrency))


@cli.command(name="metric")
def _metric_server(app: str):
def metric_server(app: str):
"""Command to start server to collect and report tasks metrics (TODO)."""
print(f"TODO: Running metrics server for app={app}")


Expand Down
6 changes: 2 additions & 4 deletions src/aiotaskq/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Module to define and store all constants used across the library."""

REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"

# REDIS_URL = "redis://127.0.0.1:6379"
# TASKS_CHANNEL = "channel:tasks"
# RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
11 changes: 9 additions & 2 deletions src/aiotaskq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
class WorkerNotReady(Exception):
"""Attempt to send task to worker but no worker is subscribing to tasks channel."""
"""
Define all exceptions that are possibly raised by the package.
Any raised thrown must be defined here.
"""


class ModuleInvalidForTask(Exception):
"""Attempt to convert to task a function in an invalid module."""
Loading

0 comments on commit 136ae39

Please sign in to comment.