Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,29 @@ To disable this pass the `--no-parse` option to the taskiq.
### Hot reload

This is annoying to restart workers every time you modify tasks. That's why taskiq supports hot-reload.
To enable this option simply pass the `--reload` or `-r` option to taskiq CLI.
Reload is unavailable by default. To enable this feature install taskiq with `reload` extra.

Also this option supports `.gitignore` files. If you have such files in your directory. It won't reload worker
if you ignore file's contents. To disable this functionality pass `--do-not-use-gitignore` option.
::: tabs


@tab pip

```bash:no-line-numbers
pip install "taskiq[reload]"
```

@tab poetry

```bash:no-line-numbers
poetry add taskiq -E reload
```

:::

To enable this option simply pass the `--reload` or `-r` option to worker taskiq CLI.

Also this option supports `.gitignore` files. If you have such file in your directory, it won't reload worker
when you modify ignored files. To disable this functionality pass `--do-not-use-gitignore` option.

## Scheduler

Expand Down
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ keywords = ["taskiq", "tasks", "distributed", "async"]
python = "^3.7"
typing-extensions = ">=3.10.0.0"
pydantic = "^1.6.2"
pyzmq = { version = "^23.2.0", optional = true }
uvloop = { version = ">=0.16.0,<1", optional = true }
watchdog = "^2.1.9"
gitignore-parser = "^0"
importlib-metadata = "*"
pycron = "^3.0.0"
taskiq_dependencies = "^1"
# For prometheus metrics
prometheus_client = { version = "^0", optional = true }
# For ZMQBroker
pyzmq = { version = "^23.2.0", optional = true }
# For speed
uvloop = { version = ">=0.16.0,<1", optional = true }
# For hot-reload.
watchdog = { version = "^2.1.9", optional = true }
gitignore-parser = { version = "^0", optional = true }

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand All @@ -59,6 +63,7 @@ types-mock = "^4.0.15"
zmq = ["pyzmq"]
uv = ["uvloop"]
metrics = ["prometheus_client"]
reload = ["watchdog", "gitignore-parser"]

[tool.poetry.scripts]
taskiq = "taskiq.__main__:main"
Expand Down
4 changes: 2 additions & 2 deletions taskiq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def main() -> None: # noqa: WPS210 # pragma: no cover
for entrypoint in entry_points().select(group="taskiq_cli"):
try:
cmd_class = entrypoint.load()
except ImportError:
print(f"Could not load {entrypoint.value}") # noqa: WPS421
except ImportError as exc:
print(f"Could not load {entrypoint.value}. Cause: {exc}") # noqa: WPS421
continue
if issubclass(cmd_class, TaskiqCMD):
subparsers.add_parser(
Expand Down
3 changes: 2 additions & 1 deletion taskiq/cli/worker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def from_cli( # noqa: WPS213
"--reload",
"-r",
action="store_true",
help="Reload workers if file is changed.",
help="Reload workers if file is changed. "
+ "`reload` extra is required for this option.",
)
parser.add_argument(
"--do-not-use-gitignore",
Expand Down
15 changes: 10 additions & 5 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
from dataclasses import dataclass
from multiprocessing import Process, Queue
from time import sleep
from typing import Any, Callable, List
from typing import Any, Callable, List, Optional

from watchdog.observers import Observer
try:
from watchdog.observers import Observer # noqa: WPS433

from taskiq.cli.watcher import FileWatcher # noqa: WPS433
except ImportError:
Observer = None # type: ignore
FileWatcher = None # type: ignore

from taskiq.cli.watcher import FileWatcher
from taskiq.cli.worker.args import WorkerArgs

logger = logging.getLogger("taskiq.process-manager")
Expand Down Expand Up @@ -132,13 +137,13 @@ class ProcessManager:
def __init__(
self,
args: WorkerArgs,
observer: Observer,
worker_function: Callable[[WorkerArgs], None],
observer: Optional[Observer] = None,
) -> None:
self.worker_function = worker_function
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
self.args = args
if args.reload:
if args.reload and observer is not None:
observer.schedule(
FileWatcher(
callback=schedule_workers_reload,
Expand Down
15 changes: 10 additions & 5 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any

from watchdog.observers import Observer

from taskiq.abc.broker import AsyncBroker
from taskiq.cli.utils import import_object, import_tasks
from taskiq.cli.worker.args import WorkerArgs
Expand All @@ -18,6 +16,11 @@
uvloop = None # type: ignore


try:
from watchdog.observers import Observer # noqa: WPS433
except ImportError:
Observer = None # type: ignore

logger = logging.getLogger("taskiq.worker")


Expand Down Expand Up @@ -133,9 +136,11 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO)
logger.info("Starting %s worker processes.", args.workers)

observer = Observer()
observer = None
if Observer is not None:
observer = Observer()

if args.reload:
if observer is not None and args.reload:
observer.start()
args.workers = 1
logging.warning(
Expand All @@ -146,7 +151,7 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213

manager.start()

if observer.is_alive():
if observer is not None and observer.is_alive():
if args.reload:
logger.info("Stopping watching files.")
observer.stop()
Expand Down