Skip to content

Commit f25fcbc

Browse files
testing new celery configs
1 parent 28f6875 commit f25fcbc

File tree

4 files changed

+26
-9
lines changed

4 files changed

+26
-9
lines changed

app/api/management/commands/check_celery_tasks.py renamed to app/api/management/commands/terminate_worker.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,25 @@ class Command(BaseCommand):
88

99
def handle(self, *args, **options):
1010

11-
this_pod = f"celery@{str(os.environ.get('THIS_POD_NAME'))}"
11+
# get celery worker
12+
this_worker = f"celery@{str(os.environ.get('THIS_POD_NAME'))}"
13+
14+
# sending initial SIGTERM to celery worker for warm-shutdown
15+
celery.app.control.broadcast('shutdown', destination=[this_worker])
1216

1317
def get_task_list():
1418
# Inspect all nodes.
1519
i = celery.app.control.inspect()
20+
1621
# Tasks received, but are still waiting to be executed.
17-
reserved = i.reserved()[this_pod]
22+
reserved = i.reserved()[this_worker]
1823
print(f'Reserved tasks -> {str(reserved)}')
24+
1925
# Active tasks
20-
active = i.active()[this_pod]
26+
active = i.active()[this_worker]
2127
print(f'Active tasks -> {str(reserved)}')
28+
29+
# Sum all tasks
2230
tasks = len(active) + len(reserved)
2331
return int(tasks)
2432

app/scanerr/celery.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import absolute_import, unicode_literals
22
from celery import Celery
3+
from celery.signals import worker_shutdown
34
from django.conf import settings
45
import scanerr, os
56

@@ -23,6 +24,7 @@
2324
worker_prefetch_multiplier=1,
2425
worker_hijack_root_logger=False,
2526
task_always_eager=False,
27+
worker_cancel_long_running_tasks_on_connection_loss=True,
2628
)
2729

2830
# setting tasks to auto-discover
@@ -31,5 +33,12 @@
3133
# setting debug
3234
@app.task(bind=False)
3335
def debug_task(self):
34-
print('Request: {0!r}'.format(self.request))
36+
print(f'Request: {self.request}')
37+
38+
# notify of SIGTERM
39+
@worker_shutdown.connect
40+
def on_worker_shutdown(**kwargs):
41+
print(f'- WORKER SHUTTING DOWN - \n\n{kwargs}')
42+
43+
3544

k8s/prod/celery-deployment.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ spec:
1414
labels:
1515
app: celery-deployment
1616
spec:
17-
terminationGracePeriodSeconds: 30
17+
terminationGracePeriodSeconds: 300
1818
imagePullSecrets:
1919
- name: regcred
2020
containers:
2121
- name: celery
22-
image: <IMAGE> # scanerr/server:facff34 #
22+
image: <IMAGE> # scanerr/server:99e2252 #
2323
imagePullPolicy: Always
2424
command: ["/entrypoint.sh", "celery"]
2525
envFrom:
@@ -33,11 +33,11 @@ spec:
3333
resources:
3434
limits:
3535
cpu: "1"
36-
memory: "5Gi"
36+
memory: "3.5Gi"
3737
requests:
3838
cpu: "1"
3939
memory: "2Gi"
4040
lifecycle:
4141
preStop:
4242
exec:
43-
command: ["python3", "manage.py", "check_celery_tasks"]
43+
command: ["python3", "manage.py", "terminate_worker"]

notes/Kubernetes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ kubectl apply -f ./k8s/prod/kubeip-daemon.yaml
191191
- `for p in $(kubectl get pods | grep Terminating | awk '{print $1}'); do kubectl delete pod $p --grace-period=0 --force;done`
192192
- Stream Logs for all celery-deployments:
193193
- `kubectl logs -f --all-containers deployment/celery-deployment`
194-
- `kubectl logs -f --selector=app=celery-deployment --all-containers`
194+
- `kubectl logs -f --selector=app=celery-deployment --all-containers --max-log-requests=7`
195195

196196

197197

0 commit comments

Comments
 (0)