Skip to content

Commit 0631f0c

Browse files
authored
Cloud Scheduler to replace Celery Beat (#2)
1 parent 8d927f3 commit 0631f0c

20 files changed

+535
-136
lines changed

.flake8

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[flake8]
2+
extend-ignore = E128, E203, E225, E266, E231, E501, E712, W503, C901, F403, F401, F841
3+
max-line-length = 119
4+
max-complexity = 18
5+
select = B,C,E,F,W,T4,B9

.isort.cfg

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[settings]
2+
line_length=119
3+
force_single_line=True
4+
import_heading_stdlib=Standard Library Imports
5+
import_heading_thirdparty=Third Party Imports
6+
import_heading_firstparty=Imports from this repository
7+
import_heading_localfolder=Imports from this module

.pre-commit-config.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
repos:
2+
- repo: https://github.com/timothycrosley/isort
3+
rev: "5.10.1"
4+
hooks:
5+
- id: isort
6+
args: [--force-single-line-imports]
7+
- repo: https://github.com/psf/black
8+
rev: 21.12b0
9+
hooks:
10+
- id: black
11+
args: [--line-length=119]
12+
- repo: https://gitlab.com/pycqa/flake8
13+
rev: 4.0.1
14+
hooks:
15+
- id: flake8
16+
additional_dependencies: [flake8-print==3.1.4]

README.md

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.
44

5+
GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
6+
57
FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
68

79
## Concept
@@ -10,29 +12,43 @@ FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
1012

1113
[FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint.
1214

13-
FastAPI Cloud Tasks works by putting the two together:
15+
16+
[`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future.
17+
18+
FastAPI Cloud Tasks works by putting the three together:
1419

1520
- It adds a `.delay` method to existing routes on FastAPI.
1621
- When this method is called, it schedules a request with Cloud Tasks.
1722
- The task worker is a regular FastAPI server which gets called by Cloud Tasks.
23+
- It adds a `.scheduler` method to existing routes on FastAPI.
24+
- When this method is called, it schedules a recurring job with Cloud Scheduler.
1825

19-
If we host the task worker on Cloud Run, we get free autoscaling.
26+
If we host the task worker on Cloud Run, we get autoscaling workers.
2027

2128
## Pseudocode
2229

2330
In practice, this is what it looks like:
2431

2532
```python
26-
router = APIRouter(route_class=TaskRouteBuilder(...))
33+
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))
34+
scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...))
2735

2836
class Recipe(BaseModel):
2937
ingredients: List[str]
3038

31-
@router.post("/{restaurant}/make_dinner")
32-
async def make_dinner(restaurant: str, recipe: Recipe,):
39+
@delayed_router.post("/{restaurant}/make_dinner")
40+
async def make_dinner(restaurant: str, recipe: Recipe):
3341
# Do a ton of work here.
3442

35-
app.include_router(router)
43+
@scheduled_router.post("/home_cook")
44+
async def home_cook(recipe: Recipe):
45+
# Make my own food
46+
47+
app.include_router(delayed_router)
48+
app.include_router(scheduled_router)
49+
50+
# If you wan to make your own breakfast every morning at 7AM IST.
51+
home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"]))
3652
```
3753

3854
Now we can trigger the task with
@@ -71,9 +87,9 @@ Forwarding http://feda-49-207-221-153.ngrok.io -> http://loca
7187
```python
7288
# complete file: examples/simple/main.py
7389

74-
# First we construct our TaskRoute class with all relevant settings
90+
# First we construct our DelayedRoute class with all relevant settings
7591
# This can be done once across the entire project
76-
TaskRoute = TaskRouteBuilder(
92+
DelayedRoute = DelayedRouteBuilder(
7793
base_url="http://feda-49-207-221-153.ngrok.io",
7894
queue_path=queue_path(
7995
project="gcp-project-id",
@@ -82,13 +98,12 @@ TaskRoute = TaskRouteBuilder(
8298
),
8399
)
84100

85-
# Wherever we use
86-
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
101+
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
87102

88103
class Payload(BaseModel):
89104
message: str
90105

91-
@task_router.post("/hello")
106+
@delayed_router.post("/hello")
92107
async def hello(p: Payload = Payload(message="Default")):
93108
logger.warning(f"Hello task ran with payload: {p.message}")
94109

@@ -102,7 +117,7 @@ async def trigger():
102117
hello.delay(p=Payload(message="Triggered task"))
103118
return {"message": "Hello task triggered"}
104119

105-
app.include_router(task_router)
120+
app.include_router(delayed_router)
106121

107122
```
108123

@@ -144,11 +159,11 @@ We'll only edit the parts from above that we need changed from above example.
144159
# URL of the Cloud Run service
145160
base_url = "https://hello-randomchars-el.a.run.app"
146161

147-
TaskRoute = TaskRouteBuilder(
162+
DelayedRoute = DelayedRouteBuilder(
148163
base_url=base_url,
149164
# Task queue, same as above.
150165
queue_path=queue_path(...),
151-
pre_create_hook=oidc_hook(
166+
pre_create_hook=oidc_task_hook(
152167
token=tasks_v2.OidcToken(
153168
# Service account that you created
154169
service_account_email="fastapi-cloud-tasks@gcp-project-id.iam.gserviceaccount.com",
@@ -162,15 +177,15 @@ Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.
162177

163178
## Configuration
164179

165-
### TaskRouteBuilder
180+
### DelayedRouteBuilder
166181

167182
Usage:
168183

169184
```python
170-
TaskRoute = TaskRouteBuilder(...)
171-
task_router = APIRouter(route_class=TaskRoute)
185+
DelayedRoute = DelayedRouteBuilder(...)
186+
delayed_router = APIRouter(route_class=DelayedRoute)
172187

173-
@task_router.get("/simple_task")
188+
@delayed_router.get("/simple_task")
174189
def mySimpleTask():
175190
return {}
176191
```
@@ -185,7 +200,7 @@ def mySimpleTask():
185200

186201
- `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc)
187202

188-
### Task level default options
203+
#### Task level default options
189204

190205
Usage:
191206

@@ -213,15 +228,15 @@ def mySimpleTask():
213228
return {}
214229
```
215230

216-
### Delayer Options
231+
#### Delayer Options
217232

218233
Usage:
219234

220235
```python
221236
mySimpleTask.options(...).delay()
222237
```
223238

224-
All options from above can be overriden per call (including TaskRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
239+
All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
225240

226241
Example:
227242

@@ -230,20 +245,36 @@ Example:
230245
mySimpleTask.options(countdown=120).delay()
231246
```
232247

248+
### ScheduledRouteBuilder
249+
250+
Usage:
251+
252+
```python
253+
ScheduledRoute = ScheduledRouteBuilder(...)
254+
scheduled_router = APIRouter(route_class=ScheduledRoute)
255+
256+
@scheduled_router.get("/simple_scheduled_task")
257+
def mySimpleScheduledTask():
258+
return {}
259+
260+
261+
mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
262+
```
263+
264+
233265
## Hooks
234266

235267
We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that.
236268

237269
Some hooks are included in the library.
238270

239-
- `oidc_hook` - Used to work with Cloud Run.
240-
- `deadline_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
271+
- `oidc_delayed_hook` / `oidc_scheduled_hook` - Used to pass OIDC token (for Cloud Run etc).
272+
- `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
241273
- `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)`
242274

243275
## Future work
244276

245277
- Ensure queue exists.
246-
- Integrate with [Cloud Scheduler](https://cloud.google.com/scheduler/) to replace celery beat.
247278
- Make helper features for worker's side. Eg:
248279
- Easier access to current retry count.
249280
- API Exceptions to make GCP back-off.

examples/full/main.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
# Standard Library Imports
12
from uuid import uuid4
23

4+
# Third Party Imports
5+
from fastapi import FastAPI
6+
from fastapi import Response
7+
from fastapi import status
8+
from google.api_core.exceptions import AlreadyExists
9+
10+
# Imports from this repository
311
from examples.full.serializer import Payload
412
from examples.full.tasks import hello
5-
from fastapi import FastAPI, Response, status
6-
from google.api_core.exceptions import AlreadyExists
713

814
app = FastAPI()
915

examples/full/serializer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# Third Party Imports
12
from pydantic import BaseModel
23

34

examples/full/settings.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,37 @@
1+
# Standard Library Imports
12
import os
23

3-
from fastapi_cloud_tasks.utils import queue_path
4+
# Third Party Imports
5+
from google.cloud import scheduler_v1
46
from google.cloud import tasks_v2
57

6-
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="http://example.com")
8+
# Imports from this repository
9+
from fastapi_cloud_tasks.utils import location_path
10+
from fastapi_cloud_tasks.utils import queue_path
11+
12+
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://645e-35-207-241-4.ngrok.io")
713
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
814
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
15+
SCHEDULED_LOCATION = os.getenv("SCHEDULED_LOCATION", default="us-central1")
916
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")
1017

11-
TASK_SERVICE_ACCOUNT = os.getenv("TASK_SERVICE_ACCOUNT", default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com")
18+
TASK_SERVICE_ACCOUNT = os.getenv(
19+
"TASK_SERVICE_ACCOUNT",
20+
default=f"fastapi-cloud-tasks@{TASK_PROJECT_ID}.iam.gserviceaccount.com",
21+
)
1222

1323
TASK_QUEUE_PATH = queue_path(
1424
project=TASK_PROJECT_ID,
1525
location=TASK_LOCATION,
1626
queue=TASK_QUEUE,
1727
)
1828

29+
SCHEDULED_LOCATION_PATH = location_path(
30+
project=TASK_PROJECT_ID,
31+
location=SCHEDULED_LOCATION,
32+
)
33+
1934
TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)
35+
SCHEDULED_OIDC_TOKEN = scheduler_v1.OidcToken(
36+
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
37+
)

examples/full/tasks.py

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,83 @@
1+
# Standard Library Imports
12
import logging
23

3-
from examples.full.serializer import Payload
4-
from examples.full.settings import TASK_LISTENER_BASE_URL, TASK_OIDC_TOKEN, TASK_QUEUE_PATH
4+
# Third Party Imports
55
from fastapi import FastAPI
66
from fastapi.routing import APIRouter
7-
from fastapi_cloud_tasks.hooks import chained_hook, deadline_hook, oidc_hook
8-
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
97
from google.protobuf import duration_pb2
108

9+
# Imports from this repository
10+
from examples.full.serializer import Payload
11+
from examples.full.settings import SCHEDULED_LOCATION_PATH
12+
from examples.full.settings import SCHEDULED_OIDC_TOKEN
13+
from examples.full.settings import TASK_LISTENER_BASE_URL
14+
from examples.full.settings import TASK_OIDC_TOKEN
15+
from examples.full.settings import TASK_QUEUE_PATH
16+
from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder
17+
from fastapi_cloud_tasks.hooks import chained_hook
18+
from fastapi_cloud_tasks.hooks import deadline_delayed_hook
19+
from fastapi_cloud_tasks.hooks import deadline_scheduled_hook
20+
from fastapi_cloud_tasks.hooks import oidc_delayed_hook
21+
from fastapi_cloud_tasks.hooks import oidc_scheduled_hook
22+
from fastapi_cloud_tasks.scheduled_route import ScheduledRouteBuilder
23+
1124
app = FastAPI()
1225

1326

1427
logger = logging.getLogger("uvicorn")
1528

16-
TaskRoute = TaskRouteBuilder(
29+
DelayedRoute = DelayedRouteBuilder(
1730
base_url=TASK_LISTENER_BASE_URL,
1831
queue_path=TASK_QUEUE_PATH,
1932
# Chain multiple hooks together
2033
pre_create_hook=chained_hook(
2134
# Add service account for cloud run
22-
oidc_hook(
35+
oidc_delayed_hook(
2336
token=TASK_OIDC_TOKEN,
2437
),
2538
# Wait for half an hour
26-
deadline_hook(duration=duration_pb2.Duration(seconds=1800)),
39+
deadline_delayed_hook(duration=duration_pb2.Duration(seconds=1800)),
40+
),
41+
)
42+
43+
ScheduledRoute = ScheduledRouteBuilder(
44+
base_url=TASK_LISTENER_BASE_URL,
45+
location_path=SCHEDULED_LOCATION_PATH,
46+
pre_create_hook=chained_hook(
47+
# Add service account for cloud run
48+
oidc_scheduled_hook(
49+
token=SCHEDULED_OIDC_TOKEN,
50+
),
51+
# Wait for half an hour
52+
deadline_scheduled_hook(duration=duration_pb2.Duration(seconds=1800)),
2753
),
2854
)
2955

30-
router = APIRouter(route_class=TaskRoute, prefix="/tasks")
56+
task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
3157

3258

33-
@router.post("/hello")
59+
@task_router.post("/hello")
3460
async def hello(p: Payload = Payload(message="Default")):
3561
message = f"Hello task ran with payload: {p.message}"
3662
logger.warning(message)
3763
return {"message": message}
3864

3965

40-
app.include_router(router)
66+
scheduled_router = APIRouter(route_class=ScheduledRoute, prefix="/scheduled")
67+
68+
69+
@scheduled_router.post("/timed_hello")
70+
async def scheduled_hello(p: Payload = Payload(message="Default")):
71+
message = f"Scheduled hello task ran with payload: {p.message}"
72+
logger.warning(message)
73+
return {"message": message}
74+
75+
76+
scheduled_hello.scheduler(
77+
name="testing-examples-scheduled-hello",
78+
schedule="*/5 * * * *",
79+
time_zone="Asia/Kolkata",
80+
).schedule(p=Payload(message="Scheduled"))
81+
82+
app.include_router(task_router)
83+
app.include_router(scheduled_router)

0 commit comments

Comments
 (0)