Description
Arq was the first real open source project I ever created, back in 2016. That was long before Pydantic, FastAPI, ParamSpec, or even Redis Streams.
I remember a sense of incredulity that I couldn't find an async variant of rq (which I was helping to maintain at the time), surely I wasn't the only person wanting to queue jobs in async code? Apparently at the time I was.
Fast forward eight years, and I'm definitely not the only person trying to queue jobs in an async world.
Hence my incredulity has only grown - there's still no ubiquitous queuing library for async Python, and despite neglect, Arq still seems to work well for lots of people, I've used it in every role I've had since, and for the most part it just works.
That said, Arq needs some love, and since we're now using it at Pydantic, I think we should have the resources to provide that love later this year. This is a rough plan of what I propose to do.
Feedback very welcome from all, but especially @JonasKs and @pydantic-maintainers (who apparently I can't tag here :-()
In summary I want to significantly refactor the internals, and update the API in a backwards compatible way.
1. ParamSpec
and type safety 🚧
The most important change we should make is to make Arq typesafe using ParamSpec
and Concatenate
, I have a partially working demonstration of how this will work below.
We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.
We should be able to do this while still supporting the current API to make migration as easy as possible.
2. Redis Streams 🚀
The second most important change is to adopt Redis streams which read like they were designed for exactly this application, they mean we can effectively guarantee only one execution while still being resilient to unexpected shutdown (Jobs shutdown during execution will be rerun later).
This should be possible without breaking the current API at all.
3. Avoid sorted set for immediate jobs 🚀
Current Arq is slower than it should be because it uses a sorted set to queue all jobs, the sorted set provides two things:
- a way to avoid loosing jobs that are cancelled during shutdown or fail - this should be solved with streams (see above)
- a way to defer jobs so they're run in the future
The idea would be to only use the sorted set for jobs scheduled to run in the future, then use the logic demonstrated by SAQ to take jobs off the sorted set when they're ready to be run and add them to the stream.
Jobs which are enqueued without a delay can be added to the stream immediately, which should significantly improve performance for this very common case.
This should be possible without breaking the current API at all.
4. Avoid polling 🇵🇱
Mostly for latency reasons it would be nice to avoid polling, the idea would be:
- the
XREADGROUP
withBLOCK
on streams mean we'll no longer need to poll for the next jobs in the worker - we can use pubsub to notify waiting code of job results to avoid polling there
5. OpenTelemetry 🔭
Observability is close to our hearts at Pydantic, so it would be nice to have optional support for OpenTelemetry, or perhaps just hook points to implement your own observability.
This should be possible without breaking the current API at all.
6. DAG - Task Dependency Graph 📈
The idea is to allow one or more jobs to be triggered by one or more previous jobs.
See the then()
and start_with()
methods in the partial implementation below.
This should be possible without breaking the current API at all.
7. CLI, settings and worker.run
improvements 🏃
We can mostly just copy uvicorn
, we should also remove the very ugly WorkerSettings
and configure the worker via simple function.
We should also fix reload logic to use watchfiles
.
This can be done such that existing code still works, with or without deprecation warnings.
8. Separate the backend ↔️
We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.
This can be done such that existing code still works, with or without deprecation warnings.
9. Better documentation 📖
Documentation should move to mkdocs material and mkdocstrings, and be improved significantly.
10. Moving repo to Pydantic 🏢
To provide the resources for this work, we should move Arq to the Pydantic organization, and the docs to arq.pydantic.dev
or similar.
Have I missed anything?
API Sketch
Here's a sketch of how I see the new type-safe API working, together with a partial implementation:
Example Usage:
from __future__ import annotations as _annotations
from dataclasses import dataclass
from typing import AsyncIterator
from contextlib import asynccontextmanager
from arq import FunctionContext, WorkerApp
from httpx import AsyncClient
@dataclass
class MyWorkerDeps:
"""
Type safe way of defining the dependencies of the worker functions.
E.g. HTTP client, database connection, settings. etc.
"""
http_client: AsyncClient
@asynccontextmanager
async def my_worker_lifespan() -> AsyncIterator[MyWorkerDeps]:
async with AsyncClient() as http_client:
yield MyWorkerDeps(http_client)
worker_app = WorkerApp(lifespan=my_worker_lifespan)
@worker_app.register
async def foo(ctx: FunctionContext[MyWorkerDeps], url: str) -> int:
# ctx.deps here is of type MyWorkerDeps, that's enforced by static typing
# FunctionContext will also provide access to a redis connection, retry count,
# even results of other jobs etc.
r = await ctx.deps.http_client.get(url)
r.raise_for_status()
print(f'{url}: {r.text[:80]!r}...')
return len(r.text)
async def main() -> None:
async with worker_app:
# these two are equivalent, param spec means the arguments are type safe
await foo.enqueue('https://microsoft.com').start()
await foo.enqueue('https://microsoft.com')
# same, delayed by 10 seconds with 5 second timeout
await foo.enqueue('https://microsoft.com').start(delay_by=10, timeout=5)
# call foo directly in the same process
print('length:', await foo.direct('https://github.com'))
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Partial Implementation
from __future__ import annotations as _annotations
from dataclasses import dataclass, KW_ONLY
from datetime import timedelta
from typing import ParamSpec, TypeVar, Generic, Concatenate, overload, Self, Any, AsyncIterator, Generator
from collections.abc import Awaitable, Callable
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from arq.connections import RedisSettings
WD = TypeVar('WD') # worker dependencies
P = ParamSpec('P') # worker function parameters
R = TypeVar('R') # worker function return type
PNext = ParamSpec('PNext')
RNext = TypeVar('RNext')
@dataclass
class Job(Generic[R]):
"""
Represents a job that has been enqueued, could be deferred, queued, running or finished.
"""
id: str
async def status(self) -> str: ...
async def result(self) -> R: ...
@dataclass
class PendingJob(Generic[WD, P, R]):
"""
Represents a job that has not been enqueued yet.
"""
deferred_deps: Callable[[], WD]
func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]
args: tuple[Any, ...]
kwargs: dict[str, Any]
_: KW_ONLY
timeout: timedelta | int | None = None
def __await__(self) -> Generator[None, None, Job]:
return self.start().__await__()
async def start(self, *, delay_by: timedelta | int | None = None) -> Job:
# also other kwargs delay_until, timeout, retry etc.
print(f'starting job {self.func.__name__}(args={self.args}, kwargs={self.kwargs}) {delay_by=}')
return Job(id='123')
def then(self,
*on_success: WorkerFunction[WD, PNext, RNext] | PendingJob[WD, PNext, RNext]
) -> PendingJob[WD, PNext, RNext]:
"""
also takes all kwargs from `start()`
TODO - AFAIK there's no way to enforce that `PNext` is `(R,)` - e.g. that the return value of the
first function is the input to the second function.
The even more complex case is where you have multiple jobs triggering a job, where I'm even more sure
full type safety is impossible.
I would therefore suggest that subsequent jobs are not allowed to take any arguments, and instead
access the results of previous jobs via `FunctionContext`
"""
...
def start_with(self, *also_start: WorkerFunction[WD, P, R]) -> WorkerFunction[WD, P, R]:
# also takes all kwargs from `start()`, I think this can be entirely type safe
...
@dataclass
class WorkerFunction(Generic[WD, P, R]):
deferred_deps: Callable[[], WD]
func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]
timeout: timedelta | int | None = None
def enqueue(self, *args: P.args, **kwargs: P.kwargs) -> PendingJob[WD, P, R]:
return PendingJob(self.deferred_deps, self.func, args, kwargs, timeout=self.timeout)
async def direct(self, *args: P.args, **kwargs: P.kwargs) -> R:
return await self.func(FunctionContext(self.deferred_deps()), *args, **kwargs)
@dataclass
class FunctionContext(Generic[WD]):
"""
Context provided to worker functions, contains deps but also a connection, retry count etc.
"""
deps: WD
@asynccontextmanager
async def none_lifespan() -> AsyncIterator[None]:
yield None
Unset = object()
class WorkerApp(Generic[WD]):
def __init__(
self,
*,
redis_settings: RedisSettings | str = 'redis://localhost',
lifespan: Callable[[], AbstractAsyncContextManager[WD]] = none_lifespan,
):
self.redis_settings = redis_settings
self.lifespan = lifespan()
self.deps: WD | Unset = Unset # type: ignore
@overload
def register(self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]], /) -> WorkerFunction[WD, P, R]: ...
@overload
def register(
self, *, timeout: timedelta | int
) -> Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: ...
def register(
self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] | None = None, *, timeout: timedelta | int | None = None
) -> WorkerFunction[WD, P, R] | Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]:
if func is None:
return lambda func: WorkerFunction(self._deferred_deps, func)
return WorkerFunction(self._deferred_deps, func, timeout=timeout)
def _deferred_deps(self) -> WD:
if self.deps is Unset:
raise RuntimeError('WorkerApp is not started')
return self.deps
async def startup(self) -> None:
self.deps = await self.lifespan.__aenter__()
async def shutdown(self, *args) -> None:
await self.lifespan.__aexit__(*args)
async def __aenter__(self) -> Self:
await self.startup()
return self
async def __aexit__(self, *args: Any) -> None:
await self.shutdown(*args)
async def run_queued_jobs(self) -> list[Any]:
"""
Run jobs already in the queue, useful for testing.
Returns a list of jobs results.
"""
...