Skip to content

Commit f2757fc

Browse files
committed
only create if changed
1 parent 5216e71 commit f2757fc

File tree

7 files changed

+44
-45
lines changed

7 files changed

+44
-45
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ repos:
88
rev: 21.12b0
99
hooks:
1010
- id: black
11+
args: [--line-length=119]
1112
- repo: https://gitlab.com/pycqa/flake8
1213
rev: 4.0.1
1314
hooks:

examples/full/settings.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
# Imports from this repository
99
from fastapi_cloud_tasks.utils import queue_path
1010

11-
TASK_LISTENER_BASE_URL = os.getenv(
12-
"TASK_LISTENER_BASE_URL", default="https://b22d-35-207-241-4.ngrok.io"
13-
)
14-
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="applied-honor-105708")
11+
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://example.com")
12+
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
1513
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
1614
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")
1715

@@ -26,9 +24,7 @@
2624
queue=TASK_QUEUE,
2725
)
2826

29-
TASK_OIDC_TOKEN = tasks_v2.OidcToken(
30-
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
31-
)
27+
TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)
3228
SCHEDULER_OIDC_TOKEN = scheduler_v1.OidcToken(
3329
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
3430
)

examples/simple/main.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515

1616
TaskRoute = TaskRouteBuilder(
1717
# Base URL where the task server will get hosted
18-
base_url=os.getenv(
19-
"TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"
20-
),
18+
base_url=os.getenv("TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"),
2119
# Full queue path to which we'll send tasks.
2220
# Edit values below to match your project
2321
queue_path=queue_path(

fastapi_cloud_tasks/delayer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ def delay(self, **kwargs):
6060

6161
request = self.pre_create_hook(request)
6262

63-
return self.client.create_task(
64-
request=request, timeout=self.task_create_timeout
65-
)
63+
return self.client.create_task(request=request, timeout=self.task_create_timeout)
6664

6765
def _schedule(self):
6866
if self.countdown is None or self.countdown <= 0:

fastapi_cloud_tasks/requester.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,18 @@ def __init__(
3333
self.base_url = base_url.rstrip("/")
3434

3535
def _headers(self, *, values):
36-
headers = err_val(
37-
request_params_to_args(self.route.dependant.header_params, values)
38-
)
39-
cookies = err_val(
40-
request_params_to_args(self.route.dependant.cookie_params, values)
41-
)
36+
headers = err_val(request_params_to_args(self.route.dependant.header_params, values))
37+
cookies = err_val(request_params_to_args(self.route.dependant.cookie_params, values))
4238
if len(cookies) > 0:
4339
headers["Cookies"] = "; ".join([f"{k}={v}" for (k, v) in cookies.items()])
4440
# We use json only.
4541
headers["Content-Type"] = "application/json"
4642
# Always send string headers and skip all headers which are supposed to be sent by cloudtasks
47-
return {
48-
str(k): str(v)
49-
for (k, v) in headers.items()
50-
if not str(k).startswith("x_cloudtasks_")
51-
}
43+
return {str(k): str(v) for (k, v) in headers.items() if not str(k).startswith("x_cloudtasks_")}
5244

5345
def _url(self, *, values):
5446
route = self.route
55-
path_values = err_val(
56-
request_params_to_args(route.dependant.path_params, values)
57-
)
47+
path_values = err_val(request_params_to_args(route.dependant.path_params, values))
5848
for (name, converter) in route.param_convertors.items():
5949
if name in path_values:
6050
continue

fastapi_cloud_tasks/scheduler.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Third Party Imports
44
from fastapi.routing import APIRoute
55
from google.cloud import scheduler_v1
6+
from google.protobuf import duration_pb2
67

78
# Imports from this repository
89
from fastapi_cloud_tasks.hooks import SchedulerHook
@@ -23,14 +24,21 @@ def __init__(
2324
name: str = "",
2425
schedule_create_timeout: float = 10.0,
2526
retry_config: scheduler_v1.RetryConfig = None,
26-
time_zone: str = None
27+
time_zone: str = "UTC",
28+
force: bool = False,
2729
) -> None:
2830
super().__init__(route=route, base_url=base_url)
2931
if name == "":
3032
name = route.unique_id
3133

3234
if retry_config is None:
33-
retry_config = scheduler_v1.RetryConfig(retry_count=5)
35+
retry_config = scheduler_v1.RetryConfig(
36+
retry_count=5,
37+
max_retry_duration=duration_pb2.Duration(seconds=0),
38+
min_backoff_duration=duration_pb2.Duration(seconds=5),
39+
max_backoff_duration=duration_pb2.Duration(seconds=120),
40+
max_doublings=5,
41+
)
3442

3543
self.retry_config = retry_config
3644
location_parts = client.parse_common_location_path(location_path)
@@ -45,6 +53,7 @@ def __init__(
4553
self.method = schedulerMethod(route.methods)
4654
self.client = client
4755
self.pre_scheduler_hook = pre_scheduler_hook
56+
self.force = force
4857

4958
def schedule(self, **kwargs):
5059
# Create http request
@@ -63,25 +72,36 @@ def schedule(self, **kwargs):
6372
http_target=request,
6473
schedule=self.cron_schedule,
6574
retry_config=self.retry_config,
75+
time_zone=self.time_zone,
6676
)
67-
if self.time_zone is not None:
68-
job.time_zone = self.time_zone
77+
6978
request = scheduler_v1.CreateJobRequest(parent=self.location_path, job=job)
7079

7180
request = self.pre_scheduler_hook(request)
7281

73-
# Delete and create job
74-
self.delete()
75-
return self.client.create_job(
76-
request=request, timeout=self.schedule_create_timeout
77-
)
82+
if self.force or self._has_changed(request=request):
83+
# Delete and create job
84+
self.delete()
85+
self.client.create_job(request=request, timeout=self.schedule_create_timeout)
86+
87+
def _has_changed(self, request: scheduler_v1.CreateJobRequest):
88+
try:
89+
job = self.client.get_job(name=request.job.name)
90+
# Remove things that are either output only or GCP adds by default
91+
job.user_update_time = None
92+
job.state = None
93+
job.schedule_time = None
94+
del job.http_target.headers["User-Agent"]
95+
# Proto compare works directly with `__eq__`
96+
return job != request.job
97+
except Exception:
98+
return True
99+
return False
78100

79101
def delete(self):
80102
# We return true or exception because you could have the delete code on multiple instances
81103
try:
82-
self.client.delete_job(
83-
name=self.job_id, timeout=self.schedule_create_timeout
84-
)
104+
self.client.delete_job(name=self.job_id, timeout=self.schedule_create_timeout)
85105
return True
86106
except Exception as ex:
87107
return ex

fastapi_cloud_tasks/utils.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,11 @@
1313

1414

1515
def location_path(*, project, location):
16-
return scheduler_v1.CloudSchedulerClient.common_location_path(
17-
project=project, location=location
18-
)
16+
return scheduler_v1.CloudSchedulerClient.common_location_path(project=project, location=location)
1917

2018

2119
def queue_path(*, project, location, queue):
22-
return tasks_v2.CloudTasksClient.queue_path(
23-
project=project, location=location, queue=queue
24-
)
20+
return tasks_v2.CloudTasksClient.queue_path(project=project, location=location, queue=queue)
2521

2622

2723
def err_val(resp: Tuple[Dict, List[ErrorWrapper]]):

0 commit comments

Comments
 (0)