1
1
# FastAPI Cloud Tasks
2
2
3
- Strongly typed background tasks with FastAPI and CloudTasks!
4
-
5
- GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.
6
-
7
- GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
8
-
9
- FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
3
+ Strongly typed background tasks with FastAPI and Google CloudTasks.
10
4
11
5
## Installation
12
6
13
7
```
14
8
pip install fastapi-cloud-tasks
15
9
```
16
10
17
- ## Concept
11
+ ## Key features
12
+
13
+ - Strongly typed tasks.
14
+ - Fail at invocation site to make it easier to develop and debug.
15
+ - Breaking schema changes between versions will fail at task runner with Pydantic.
16
+ - Familiar and simple public API
17
+ - ` .delay ` method that takes same arguments as the task.
18
+ - ` .scheduler ` method to create recurring job.
19
+ - Tasks are regular FastAPI endpoints on plain old HTTP.
20
+ - ` Depends ` just works!
21
+ - All middlewares, telemetry, auth, debugging etc solutions for FastAPI work as is.
22
+ - Host task runners it independent of GCP. If CloudTasks can reach the URL, it can invoke the task.
23
+ - Save money.
24
+ - Task invocation with GCP is [ free for first million, then costs $0.4/million] ( https://cloud.google.com/tasks/pricing ) .
25
+ That's almost always cheaper than running a RabbitMQ/Redis/SQL backend for celery.
26
+ - Jobs cost [ $0.1 per job per month irrespective of invocations. 3 jobs are free.] ( https://cloud.google.com/scheduler#pricing )
27
+ Either free or almost always cheaper than always running beat worker.
28
+ - If somehow, this cost ever becomes a concern, the ` client ` can be overriden to call any gRPC server with a compatible API.
29
+ [ Here's a trivial emulator implementation that we will use locally] ( https://github.com/aertje/cloud-tasks-emulator )
30
+ - Autoscale.
31
+ - With a FaaS setup, your task workers can autoscale based on load.
32
+ - Most FaaS services have free tiers making it much cheaper than running a celery worker.
33
+
34
+ ## How it works
35
+
36
+ ### Delayed job
18
37
19
- [ ` Cloud Tasks ` ] ( https://cloud.google.com/tasks ) allows us to schedule a HTTP request in the future.
38
+ ``` python
39
+ from fastapi_cloud_tasks import DelayedRouteBuilder
20
40
21
- [ FastAPI ] ( https://fastapi.tiangolo.com/tutorial/body/ ) makes us define complete schema and params for an HTTP endpoint.
41
+ delayed_router = APIRouter( route_class = DelayedRouteBuilder( ... ))
22
42
43
+ class Recipe (BaseModel ):
44
+ ingredients: List[str ]
23
45
24
- [ ` Cloud Scheduler ` ] ( https://cloud.google.com/scheduler ) allows us to schedule recurring HTTP requests in the future.
46
+ @delayed_router.post (" /{restaurant} /make_dinner" )
47
+ async def make_dinner (restaurant : str , recipe : Recipe):
48
+ # Do a ton of work here.
25
49
26
- FastAPI Cloud Tasks works by putting the three together:
27
50
28
- - It adds a ` .delay ` method to existing routes on FastAPI.
29
- - When this method is called, it schedules a request with Cloud Tasks.
30
- - The task worker is a regular FastAPI server which gets called by Cloud Tasks.
31
- - It adds a ` .scheduler ` method to existing routes on FastAPI.
32
- - When this method is called, it schedules a recurring job with Cloud Scheduler.
51
+ app.include_router(delayed_router)
52
+ ```
33
53
34
- If we host the task worker on Cloud Run, we get autoscaling workers.
54
+ Now we can trigger the task with
35
55
36
- ## Pseudocode
56
+ ``` python
57
+ make_dinner.delay(restaurant = " Taj" , recipe = Recipe(ingredients = [" Pav" ," Bhaji" ]))
58
+ ```
37
59
38
- In practice, this is what it looks like:
60
+ If we want to trigger the task 30 minutes later
39
61
40
62
``` python
41
- delayed_router = APIRouter(route_class = DelayedRouteBuilder(... ))
63
+ make_dinner.options(countdown = 1800 ).delay(... )
64
+ ```
65
+
66
+ ### Scheduled Task
67
+ ``` python
68
+ from fastapi_cloud_tasks import ScheduledRouteBuilder
69
+
42
70
scheduled_router = APIRouter(route_class = ScheduledRouteBuilder(... ))
43
71
44
72
class Recipe (BaseModel ):
45
73
ingredients: List[str ]
46
74
47
- @delayed_router.post (" /{restaurant} /make_dinner" )
48
- async def make_dinner (restaurant : str , recipe : Recipe):
49
- # Do a ton of work here.
50
-
51
75
@scheduled_router.post (" /home_cook" )
52
76
async def home_cook (recipe : Recipe):
53
77
# Make my own food
54
78
55
- app.include_router(delayed_router)
56
79
app.include_router(scheduled_router)
57
80
58
- # If you wan to make your own breakfast every morning at 7AM IST.
81
+ # If you want to make your own breakfast every morning at 7AM IST.
59
82
home_cook.scheduler(name = " test-home-cook-at-7AM-IST" , schedule = " 0 7 * * *" , time_zone = " Asia/Kolkata" ).schedule(recipe = Recipe(ingredients = [" Milk" ," Cereal" ]))
60
83
```
61
84
62
- Now we can trigger the task with
85
+ ## Concept
63
86
64
- ``` python
65
- make_dinner.delay(restaurant = " Taj" , recipe = Recipe(ingredients = [" Pav" ," Bhaji" ]))
66
- ```
87
+ [ ` Cloud Tasks ` ] ( https://cloud.google.com/tasks ) allows us to schedule a HTTP request in the future.
88
+
89
+ [ FastAPI] ( https://fastapi.tiangolo.com/tutorial/body/ ) makes us define complete schema and params for an HTTP endpoint.
90
+
91
+ [ ` Cloud Scheduler ` ] ( https://cloud.google.com/scheduler ) allows us to schedule recurring HTTP requests in the future.
92
+
93
+ FastAPI Cloud Tasks works by putting the three together:
94
+
95
+ - GCP's Cloud Tasks + FastAPI = Partial replacement for celery's async delayed tasks.
96
+ - GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
97
+ - FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
67
98
68
- If we want to trigger the task 30 minutes later
69
99
70
- ``` python
71
- make_dinner.options(countdown = 1800 ).delay(... )
72
- ```
73
100
74
101
## Running
75
102
76
103
### Local
77
104
78
105
Pre-requisites:
106
+ - ` pip install local-requirements.txt `
107
+ - Install [ cloud-tasks-emulator] ( https://github.com/aertje/cloud-tasks-emulator )
108
+ - Alternatively install ngrok and forward the server's port
79
109
80
- - Create a task queue and copy the project id, location and queue name.
81
- - Install and ensure that ngrok works.
110
+ Start running the emulator in a terminal
111
+ ``` sh
112
+ cloud-tasks-emulator
113
+ ```
82
114
83
- We will need a an API endpoint to give to cloud tasks, so let us fire up ngrok on local
115
+ Start running the task runner on port 8000 so that it is accessible from cloud tasks.
84
116
85
117
``` sh
86
- ngrok http 8000
118
+ uvicorn examples.simple.main:app --reload --port 8000
119
+ ```
120
+
121
+ In another terminal, trigger the task with curl
122
+
123
+ ```
124
+ curl http://localhost:8000/trigger
87
125
```
88
126
89
- You'll see something like this
127
+ Check the logs on the server, you should see
90
128
91
129
```
92
- Forwarding http://feda-49-207-221-153.ngrok.io -> http://localhost:8000
130
+ WARNING: Hello task ran with payload: Triggered task
93
131
```
94
132
133
+ Important bits of code:
134
+
95
135
``` python
96
136
# complete file: examples/simple/main.py
97
137
98
- # First we construct our DelayedRoute class with all relevant settings
138
+ # For local, we connect to the emulator client
139
+ client = None
140
+ if IS_LOCAL :
141
+ client = emulator_client()
142
+
143
+ # Construct our DelayedRoute class with all relevant settings
99
144
# This can be done once across the entire project
100
145
DelayedRoute = DelayedRouteBuilder(
101
- base_url = " http://feda-49-207-221-153.ngrok.io" ,
146
+ client = client,
147
+ base_url = " http://localhost:8000"
102
148
queue_path = queue_path(
103
149
project = " gcp-project-id" ,
104
150
location = " asia-south1" ,
105
151
queue = " test-queue" ,
106
152
),
107
153
)
108
154
109
- delayed_router = APIRouter(route_class = DelayedRoute, prefix = " /tasks" )
155
+ # Override the route_class so that we can add .delay method to the endpoints and know their complete URL
156
+ delayed_router = APIRouter(route_class = DelayedRoute, prefix = " /delayed" )
110
157
111
158
class Payload (BaseModel ):
112
159
message: str
@@ -129,29 +176,11 @@ app.include_router(delayed_router)
129
176
130
177
```
131
178
132
- Start running the task runner on port 8000 so that it is accessible from cloud tasks.
133
-
134
- ``` sh
135
- uvicorn main:app --reload --port 8000
136
- ```
137
-
138
- In another terminal, trigger the task with curl
139
-
140
- ```
141
- curl http://localhost:8000/trigger
142
- ```
143
-
144
- Check the logs on the server, you should see
145
-
146
- ```
147
- WARNING: Hello task ran with payload: Triggered task
148
- ```
149
-
150
179
Note: You can read complete working source code of the above example in [ ` examples/simple/main.py ` ] ( examples/simple/main.py )
151
180
152
181
In the real world you'd have a separate process for task runner and actual task.
153
182
154
- ### Cloud Run
183
+ ### Deployed environment / Cloud Run
155
184
156
185
Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a ` hook ` .
157
186
@@ -161,7 +190,6 @@ Pre-requisites:
161
190
- Deploy the worker as a service on Cloud Run and copy it's URL.
162
191
- Create a service account in cloud IAM and add ` Cloud Run Invoker ` role to it.
163
192
164
- We'll only edit the parts from above that we need changed from above example.
165
193
166
194
``` python
167
195
# URL of the Cloud Run service
@@ -183,6 +211,10 @@ DelayedRoute = DelayedRouteBuilder(
183
211
184
212
Check the fleshed out example at [ ` examples/full/tasks.py ` ] ( examples/full/tasks.py )
185
213
214
+ If you're not running on CloudRun and want to an OAuth Token instead, you can use the ` oauth_task_hook ` instead.
215
+
216
+ Check [ fastapi_cloud_tasks/hooks.py] ( fastapi_cloud_tasks/hooks.py ) to get the hang od hooks and how you can use them.
217
+
186
218
## Configuration
187
219
188
220
### DelayedRouteBuilder
@@ -194,7 +226,7 @@ DelayedRoute = DelayedRouteBuilder(...)
194
226
delayed_router = APIRouter(route_class = DelayedRoute)
195
227
196
228
@delayed_router.get (" /simple_task" )
197
- def mySimpleTask ():
229
+ def simple_task ():
198
230
return {}
199
231
```
200
232
@@ -213,9 +245,9 @@ def mySimpleTask():
213
245
Usage:
214
246
215
247
``` python
216
- @task_router .get (" /simple_task" )
248
+ @delayed_router .get (" /simple_task" )
217
249
@task_default_options (... )
218
- def mySimpleTask ():
250
+ def simple_task ():
219
251
return {}
220
252
```
221
253
@@ -226,13 +258,13 @@ Additional options:
226
258
- ` countdown ` - Seconds in the future to schedule the task.
227
259
- ` task_id ` - named task id for deduplication. (One task id will only be queued once.)
228
260
229
- Eg :
261
+ Example :
230
262
231
263
``` python
232
264
# Trigger after 5 minutes
233
- @task_router .get (" /simple_task" )
265
+ @delayed_router .get (" /simple_task" )
234
266
@task_default_options (countdown = 300 )
235
- def mySimpleTask ():
267
+ def simple_task ():
236
268
return {}
237
269
```
238
270
@@ -241,7 +273,7 @@ def mySimpleTask():
241
273
Usage:
242
274
243
275
``` python
244
- mySimpleTask .options(... ).delay()
276
+ simple_task .options(... ).delay()
245
277
```
246
278
247
279
All options from above can be overriden per call (including DelayedRouteBuilder options like ` base_url ` ) with kwargs to the ` options ` function before calling delay.
@@ -250,7 +282,7 @@ Example:
250
282
251
283
``` python
252
284
# Trigger after 2 minutes
253
- mySimpleTask .options(countdown = 120 ).delay()
285
+ simple_task .options(countdown = 120 ).delay()
254
286
```
255
287
256
288
### ScheduledRouteBuilder
@@ -262,11 +294,11 @@ ScheduledRoute = ScheduledRouteBuilder(...)
262
294
scheduled_router = APIRouter(route_class = ScheduledRoute)
263
295
264
296
@scheduled_router.get (" /simple_scheduled_task" )
265
- def mySimpleScheduledTask ():
297
+ def simple_scheduled_task ():
266
298
return {}
267
299
268
300
269
- mySimpleScheduledTask .scheduler(name = " simple_scheduled_task" , schedule = " * * * * *" ).schedule()
301
+ simple_scheduled_task .scheduler(name = " simple_scheduled_task" , schedule = " * * * * *" ).schedule()
270
302
```
271
303
272
304
@@ -280,9 +312,24 @@ Some hooks are included in the library.
280
312
- ` deadline_delayed_hook ` / ` deadline_scheduled_hook ` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
281
313
- ` chained_hook ` - If you need to chain multiple hooks together, you can do that with ` chained_hook(hook1, hook2) `
282
314
283
- ## Future work
315
+ ## Helper dependencies
316
+
317
+ ### max_retries
318
+
319
+ ``` python
320
+ @delayed_router.post (" /fail_twice" , dependencies = [Depends(max_retries(2 ))])
321
+ async def fail_twice ():
322
+ raise Exception (" nooo" )
323
+ ```
324
+
325
+ ### CloudTasksHeaders
326
+
327
+ ``` python
328
+ @delayed_router.get (" /my_task" )
329
+ async def my_task (ct_headers : CloudTasksHeaders = Depends()):
330
+ print (ct_headers.queue_name)
331
+ ```
332
+
333
+ Check the file [ fastapi_cloud_tasks/dependencies.py] ( fastapi_cloud_tasks/dependencies.py ) for details.
284
334
285
- - Ensure queue exists.
286
- - Make helper features for worker's side. Eg:
287
- - Easier access to current retry count.
288
- - API Exceptions to make GCP back-off.
335
+ Note: This project is neither affiliated with, nor sponsored by Google.
0 commit comments