Skip to content

Clean up, add helpers and add some docs #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ repos:
rev: 21.12b0
hooks:
- id: black
args: [--line-length=119]
- repo: https://gitlab.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
additional_dependencies: [flake8-print==3.1.4]
additional_dependencies: [flake8-print==4.0.0]
205 changes: 126 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,112 +1,159 @@
# FastAPI Cloud Tasks

Strongly typed background tasks with FastAPI and CloudTasks!

GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.

GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.

FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
Strongly typed background tasks with FastAPI and Google CloudTasks.

## Installation

```
pip install fastapi-cloud-tasks
```

## Concept
## Key features

- Strongly typed tasks.
- Fail at invocation site to make it easier to develop and debug.
- Breaking schema changes between versions will fail at task runner with Pydantic.
- Familiar and simple public API
- `.delay` method that takes same arguments as the task.
- `.scheduler` method to create recurring job.
- Tasks are regular FastAPI endpoints on plain old HTTP.
- `Depends` just works!
- All middlewares, telemetry, auth, debugging etc solutions for FastAPI work as is.
- Host task runners it independent of GCP. If CloudTasks can reach the URL, it can invoke the task.
- Save money.
- Task invocation with GCP is [free for first million, then costs $0.4/million](https://cloud.google.com/tasks/pricing).
That's almost always cheaper than running a RabbitMQ/Redis/SQL backend for celery.
- Jobs cost [$0.1 per job per month irrespective of invocations. 3 jobs are free.](https://cloud.google.com/scheduler#pricing)
Either free or almost always cheaper than always running beat worker.
- If somehow, this cost ever becomes a concern, the `client` can be overriden to call any gRPC server with a compatible API.
[Here's a trivial emulator implementation that we will use locally](https://github.com/aertje/cloud-tasks-emulator)
- Autoscale.
- With a FaaS setup, your task workers can autoscale based on load.
- Most FaaS services have free tiers making it much cheaper than running a celery worker.

## How it works

### Delayed job

[`Cloud Tasks`](https://cloud.google.com/tasks) allows us to schedule a HTTP request in the future.
```python
from fastapi_cloud_tasks import DelayedRouteBuilder

[FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint.
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))

class Recipe(BaseModel):
ingredients: List[str]

[`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future.
@delayed_router.post("/{restaurant}/make_dinner")
async def make_dinner(restaurant: str, recipe: Recipe):
# Do a ton of work here.

FastAPI Cloud Tasks works by putting the three together:

- It adds a `.delay` method to existing routes on FastAPI.
- When this method is called, it schedules a request with Cloud Tasks.
- The task worker is a regular FastAPI server which gets called by Cloud Tasks.
- It adds a `.scheduler` method to existing routes on FastAPI.
- When this method is called, it schedules a recurring job with Cloud Scheduler.
app.include_router(delayed_router)
```

If we host the task worker on Cloud Run, we get autoscaling workers.
Now we can trigger the task with

## Pseudocode
```python
make_dinner.delay(restaurant="Taj", recipe=Recipe(ingredients=["Pav","Bhaji"]))
```

In practice, this is what it looks like:
If we want to trigger the task 30 minutes later

```python
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))
make_dinner.options(countdown=1800).delay(...)
```

### Scheduled Task
```python
from fastapi_cloud_tasks import ScheduledRouteBuilder

scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...))

class Recipe(BaseModel):
ingredients: List[str]

@delayed_router.post("/{restaurant}/make_dinner")
async def make_dinner(restaurant: str, recipe: Recipe):
# Do a ton of work here.

@scheduled_router.post("/home_cook")
async def home_cook(recipe: Recipe):
# Make my own food

app.include_router(delayed_router)
app.include_router(scheduled_router)

# If you wan to make your own breakfast every morning at 7AM IST.
# If you want to make your own breakfast every morning at 7AM IST.
home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"]))
```

Now we can trigger the task with
## Concept

```python
make_dinner.delay(restaurant="Taj", recipe=Recipe(ingredients=["Pav","Bhaji"]))
```
[`Cloud Tasks`](https://cloud.google.com/tasks) allows us to schedule a HTTP request in the future.

[FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint.

[`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future.

FastAPI Cloud Tasks works by putting the three together:

- GCP's Cloud Tasks + FastAPI = Partial replacement for celery's async delayed tasks.
- GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
- FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.

If we want to trigger the task 30 minutes later

```python
make_dinner.options(countdown=1800).delay(...)
```

## Running

### Local

Pre-requisites:
- `pip install local-requirements.txt`
- Install [cloud-tasks-emulator](https://github.com/aertje/cloud-tasks-emulator)
- Alternatively install ngrok and forward the server's port

- Create a task queue and copy the project id, location and queue name.
- Install and ensure that ngrok works.
Start running the emulator in a terminal
```sh
cloud-tasks-emulator
```

We will need a an API endpoint to give to cloud tasks, so let us fire up ngrok on local
Start running the task runner on port 8000 so that it is accessible from cloud tasks.

```sh
ngrok http 8000
uvicorn examples.simple.main:app --reload --port 8000
```

In another terminal, trigger the task with curl

```
curl http://localhost:8000/trigger
```

You'll see something like this
Check the logs on the server, you should see

```
Forwarding http://feda-49-207-221-153.ngrok.io -> http://localhost:8000
WARNING: Hello task ran with payload: Triggered task
```

Important bits of code:

```python
# complete file: examples/simple/main.py

# First we construct our DelayedRoute class with all relevant settings
# For local, we connect to the emulator client
client = None
if IS_LOCAL:
client = emulator_client()

# Construct our DelayedRoute class with all relevant settings
# This can be done once across the entire project
DelayedRoute = DelayedRouteBuilder(
base_url="http://feda-49-207-221-153.ngrok.io",
client=client,
base_url="http://localhost:8000"
queue_path=queue_path(
project="gcp-project-id",
location="asia-south1",
queue="test-queue",
),
)

delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
# Override the route_class so that we can add .delay method to the endpoints and know their complete URL
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/delayed")

class Payload(BaseModel):
message: str
Expand All @@ -129,29 +176,11 @@ app.include_router(delayed_router)

```

Start running the task runner on port 8000 so that it is accessible from cloud tasks.

```sh
uvicorn main:app --reload --port 8000
```

In another terminal, trigger the task with curl

```
curl http://localhost:8000/trigger
```

Check the logs on the server, you should see

```
WARNING: Hello task ran with payload: Triggered task
```

Note: You can read complete working source code of the above example in [`examples/simple/main.py`](examples/simple/main.py)

In the real world you'd have a separate process for task runner and actual task.

### Cloud Run
### Deployed environment / Cloud Run

Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a `hook`.

Expand All @@ -161,7 +190,6 @@ Pre-requisites:
- Deploy the worker as a service on Cloud Run and copy it's URL.
- Create a service account in cloud IAM and add `Cloud Run Invoker` role to it.

We'll only edit the parts from above that we need changed from above example.

```python
# URL of the Cloud Run service
Expand All @@ -183,6 +211,10 @@ DelayedRoute = DelayedRouteBuilder(

Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.py)

If you're not running on CloudRun and want to an OAuth Token instead, you can use the `oauth_task_hook` instead.

Check [fastapi_cloud_tasks/hooks.py](fastapi_cloud_tasks/hooks.py) to get the hang od hooks and how you can use them.

## Configuration

### DelayedRouteBuilder
Expand All @@ -194,7 +226,7 @@ DelayedRoute = DelayedRouteBuilder(...)
delayed_router = APIRouter(route_class=DelayedRoute)

@delayed_router.get("/simple_task")
def mySimpleTask():
def simple_task():
return {}
```

Expand All @@ -213,9 +245,9 @@ def mySimpleTask():
Usage:

```python
@task_router.get("/simple_task")
@delayed_router.get("/simple_task")
@task_default_options(...)
def mySimpleTask():
def simple_task():
return {}
```

Expand All @@ -226,13 +258,13 @@ Additional options:
- `countdown` - Seconds in the future to schedule the task.
- `task_id` - named task id for deduplication. (One task id will only be queued once.)

Eg:
Example:

```python
# Trigger after 5 minutes
@task_router.get("/simple_task")
@delayed_router.get("/simple_task")
@task_default_options(countdown=300)
def mySimpleTask():
def simple_task():
return {}
```

Expand All @@ -241,7 +273,7 @@ def mySimpleTask():
Usage:

```python
mySimpleTask.options(...).delay()
simple_task.options(...).delay()
```

All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
Expand All @@ -250,7 +282,7 @@ Example:

```python
# Trigger after 2 minutes
mySimpleTask.options(countdown=120).delay()
simple_task.options(countdown=120).delay()
```

### ScheduledRouteBuilder
Expand All @@ -262,11 +294,11 @@ ScheduledRoute = ScheduledRouteBuilder(...)
scheduled_router = APIRouter(route_class=ScheduledRoute)

@scheduled_router.get("/simple_scheduled_task")
def mySimpleScheduledTask():
def simple_scheduled_task():
return {}


mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
```


Expand All @@ -280,9 +312,24 @@ Some hooks are included in the library.
- `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
- `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)`

## Future work
## Helper dependencies

### max_retries

```python
@delayed_router.post("/fail_twice", dependencies=[Depends(max_retries(2))])
async def fail_twice():
raise Exception("nooo")
```

### CloudTasksHeaders

```python
@delayed_router.get("/my_task")
async def my_task(ct_headers: CloudTasksHeaders = Depends()):
print(ct_headers.queue_name)
```

Check the file [fastapi_cloud_tasks/dependencies.py](fastapi_cloud_tasks/dependencies.py) for details.

- Ensure queue exists.
- Make helper features for worker's side. Eg:
- Easier access to current retry count.
- API Exceptions to make GCP back-off.
Note: This project is neither affiliated with, nor sponsored by Google.
Loading