Skip to content
This repository was archived by the owner on Mar 27, 2023. It is now read-only.

Commit f158e69

Browse files
committed
added lambda function to call application endpoint that will updated cloudwatch metrics for celery autoscaling
1 parent 708de41 commit f158e69

File tree

12 files changed

+207
-126
lines changed

12 files changed

+207
-126
lines changed

awscdk/awscdk/awscdk.egg-info/requires.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ aws-cdk.aws_applicationautoscaling==1.42.0
55
aws-cdk.aws_certificatemanager==1.42.0
66
aws-cdk.aws_cloudwatch==1.42.0
77
aws-cdk.aws_logs==1.42.0
8+
aws-cdk.aws_lambda==1.42.0
89
aws-cdk.aws_events==1.42.0
910
aws-cdk.aws_events_targets==1.42.0
1011
aws-cdk.aws_secretsmanager==1.42.0

awscdk/awscdk/cdk_app_root.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from env_vars import Variables
1515
from static_site_bucket import StaticSiteStack
1616
from flower import FlowerServiceStack
17+
from celery_autoscaling import CeleryAutoscalingStack
1718

1819
from backend import BackendServiceStack
1920
from backend_tasks import BackendTasksStack
@@ -101,5 +102,11 @@ def __init__(
101102
self, "CeleryDefaultServiceStack"
102103
)
103104

105+
# define other celery queues here, or combine in a single construct
106+
107+
self.celery_autoscaling = CeleryAutoscalingStack(
108+
self, "CeleryAutoscalingStack"
109+
)
110+
104111
# migrate, collectstatic, createsuperuser
105112
self.backend_tasks = BackendTasksStack(self, "BackendTasksStack")

awscdk/awscdk/celery_autoscaling.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from aws_cdk import (
2+
core,
3+
aws_ec2 as ec2,
4+
aws_ecs as ecs,
5+
aws_events as events,
6+
aws_lambda,
7+
aws_events_targets as events_targets,
8+
aws_logs as logs,
9+
aws_cloudformation as cloudformation,
10+
)
11+
12+
13+
class CeleryAutoscalingStack(cloudformation.NestedStack):
14+
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
15+
super().__init__(
16+
scope, id, **kwargs,
17+
)
18+
19+
self.lambda_function = aws_lambda.Function(
20+
self,
21+
"CeleryMetricsLambdaFunction",
22+
code=aws_lambda.Code.asset("awslambda"),
23+
handler="publish_celery_metrics.lambda_handler",
24+
runtime=aws_lambda.Runtime.PYTHON_3_7,
25+
environment=scope.variables.regular_variables,
26+
)
27+
28+
self.celery_default_cw_metric_schedule = events.Rule(
29+
self,
30+
"CeleryDefaultCWMetricSchedule",
31+
schedule=events.Schedule.rate(core.Duration.minutes(5)),
32+
targets=[
33+
events_targets.LambdaFunction(handler=self.lambda_function)
34+
],
35+
)
36+
37+
# TODO: refactor this to loop through CloudWatch metrics multiple celery queues
38+
scope.celery_default_service.default_celery_queue_cw_metric.grant_put_metric_data(
39+
scope.backend_service.backend_task.task_role
40+
)

awscdk/awscdk/celery_default.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -86,42 +86,42 @@ def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
8686
adjustment_type=aas.AdjustmentType.CHANGE_IN_CAPACITY,
8787
)
8888

89-
self.celery_default_cw_monitor_task = ecs.FargateTaskDefinition(
90-
self, "CeleryDefaultCWMonitoringTask"
91-
)
89+
# self.celery_default_cw_monitor_task = ecs.FargateTaskDefinition(
90+
# self, "CeleryDefaultCWMonitoringTask"
91+
# )
9292

93-
self.celery_default_cw_monitor_task.add_container(
94-
"CeleryDefaultCWMonitoringTaskContainer",
95-
image=scope.image,
96-
logging=ecs.LogDrivers.aws_logs(
97-
stream_prefix="CeleryDefaultCWMonitoringContainerLogs",
98-
log_retention=logs.RetentionDays.ONE_DAY,
99-
),
100-
environment=scope.variables.regular_variables,
101-
secrets=scope.variables.secret_variables,
102-
command=["python3", "manage.py", "put_celery_cloudwatch_metrics"],
103-
)
93+
# self.celery_default_cw_monitor_task.add_container(
94+
# "CeleryDefaultCWMonitoringTaskContainer",
95+
# image=scope.image,
96+
# logging=ecs.LogDrivers.aws_logs(
97+
# stream_prefix="CeleryDefaultCWMonitoringContainerLogs",
98+
# log_retention=logs.RetentionDays.ONE_DAY,
99+
# ),
100+
# environment=scope.variables.regular_variables,
101+
# secrets=scope.variables.secret_variables,
102+
# command=["python3", "manage.py", "put_celery_cloudwatch_metrics"],
103+
# )
104104

105-
self.celery_default_cw_metric_schedule = events.Rule(
106-
self,
107-
"CeleryDefaultCWMetricSchedule",
108-
schedule=events.Schedule.rate(core.Duration.minutes(5)),
109-
targets=[
110-
events_targets.EcsTask(
111-
cluster=scope.cluster,
112-
task_definition=self.celery_default_cw_monitor_task,
113-
subnet_selection=ec2.SubnetSelection(
114-
subnet_type=ec2.SubnetType.PUBLIC
115-
),
116-
security_group=ec2.SecurityGroup.from_security_group_id(
117-
self,
118-
"CeleryDefaultCWMetricScheduleSG",
119-
security_group_id=scope.vpc.vpc_default_security_group,
120-
),
121-
)
122-
],
123-
)
105+
# self.celery_default_cw_metric_schedule = events.Rule(
106+
# self,
107+
# "CeleryDefaultCWMetricSchedule",
108+
# schedule=events.Schedule.rate(core.Duration.minutes(5)),
109+
# targets=[
110+
# events_targets.EcsTask(
111+
# cluster=scope.cluster,
112+
# task_definition=self.celery_default_cw_monitor_task,
113+
# subnet_selection=ec2.SubnetSelection(
114+
# subnet_type=ec2.SubnetType.PUBLIC
115+
# ),
116+
# security_group=ec2.SecurityGroup.from_security_group_id(
117+
# self,
118+
# "CeleryDefaultCWMetricScheduleSG",
119+
# security_group_id=scope.vpc.vpc_default_security_group,
120+
# ),
121+
# )
122+
# ],
123+
# )
124124

125-
self.default_celery_queue_cw_metric.grant_put_metric_data(
126-
self.celery_default_cw_monitor_task.task_role
127-
)
125+
# self.default_celery_queue_cw_metric.grant_put_metric_data(
126+
# self.celery_default_cw_monitor_task.task_role
127+
# )

awscdk/awscdk/env_vars.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
"DEBUG": "",
3333
"FULL_DOMAIN_NAME": full_domain_name,
3434
"FULL_APP_NAME": scope.full_app_name,
35+
"CELERY_METRICS_TOKEN": "my-secret-token",
3536
"AWS_STORAGE_BUCKET_NAME": bucket_name,
3637
"POSTGRES_SERVICE_HOST": postgres_host,
3738
"POSTGRES_PASSWORD": db_secret.secret_value_from_json(

awscdk/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"aws-cdk.aws_certificatemanager==1.42.0",
2323
"aws-cdk.aws_cloudwatch==1.42.0",
2424
"aws-cdk.aws_logs==1.42.0",
25+
"aws-cdk.aws_lambda==1.42.0",
2526
"aws-cdk.aws_events==1.42.0",
2627
"aws-cdk.aws_events_targets==1.42.0",
2728
"aws-cdk.aws_secretsmanager==1.42.0",

awslambda/publish_celery_metrics.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import json
2+
import os
3+
import urllib.request
4+
5+
FULL_DOMAIN_NAME = os.environ.get("FULL_DOMAIN_NAME")
6+
CELERY_METRICS_PATH = "api/celery-metrics/"
7+
8+
CELERY_METRICS_URL = f"https://{FULL_DOMAIN_NAME}/{CELERY_METRICS_PATH}"
9+
CELERY_METRICS_TOKEN = os.environ.get("CELERY_METRICS_TOKEN")
10+
11+
12+
def lambda_handler(event, context):
13+
data = {"celery_metrics_token": CELERY_METRICS_TOKEN}
14+
params = json.dumps(data).encode('utf8')
15+
req = urllib.request.Request(
16+
CELERY_METRICS_URL,
17+
data=params,
18+
headers={'content-type': 'application/json'},
19+
)
20+
response = urllib.request.urlopen(req)
21+
return response

backend/apps/core/management/commands/put_celery_cloudwatch_metrics.py

Lines changed: 0 additions & 88 deletions
This file was deleted.

backend/apps/core/urls.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
urlpatterns = [
66
path("health-check/", views.health_check, name="health-check"),
7+
path("celery-metrics/", views.celery_metrics, name="celery-metrics"),
78
path("celery/sleep-task/", views.sleep_task_view, name="sleep-task"),
89
path(
910
"debug/send-test-email/",
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import os
2+
3+
import redis
4+
5+
from django.conf import settings
6+
7+
from apps.core import celery_app
8+
import boto3
9+
10+
11+
def active_and_reserved_tasks_by_queue_name(queue_name):
12+
"""
13+
i.active() returns a dictionary where keys are worker names
14+
and values are lists of active tasks for the worker
15+
16+
"""
17+
print("inspecting celery queue")
18+
i = celery_app.control.inspect()
19+
20+
active = i.active()
21+
active_count = 0
22+
if active:
23+
for _, active_tasks in active.items():
24+
active_count += len(
25+
[
26+
task
27+
for task in active_tasks
28+
if task['delivery_info']['routing_key'] == queue_name
29+
]
30+
)
31+
32+
reserved = i.reserved()
33+
reserved_count = 0
34+
if reserved:
35+
for _, reserved_tasks in reserved.items():
36+
reserved_count += len(
37+
[
38+
task
39+
for task in reserved_tasks
40+
if task['delivery_info']['routing_key'] == queue_name
41+
]
42+
)
43+
44+
print("connecting to redis")
45+
r = redis.Redis(
46+
host=settings.REDIS_SERVICE_HOST,
47+
port=6379,
48+
db=1,
49+
charset="utf-8",
50+
decode_responses=True,
51+
)
52+
53+
queue_length = r.llen("default")
54+
total = active_count + reserved_count + queue_length
55+
print(f"Active count: {active_count}")
56+
print(f"Reserved count: {reserved_count}")
57+
print(f"Queue length: {queue_length}")
58+
print(f"Total: {total}")
59+
return total
60+
61+
62+
def publish_queue_metrics(queue_names):
63+
print("gathering queue data")
64+
metric_data = {
65+
queue_name: active_and_reserved_tasks_by_queue_name(queue_name)
66+
for queue_name in queue_names
67+
}
68+
print("sending cloudwatch data")
69+
if not settings.DEBUG:
70+
print("connecting aws api")
71+
client = boto3.client('cloudwatch')
72+
client.put_metric_data(
73+
Namespace=os.environ.get("FULL_APP_NAME", "FULL_APP_NAME"),
74+
MetricData=[
75+
{"MetricName": metric_name, "Value": value}
76+
for metric_name, value in metric_data.items()
77+
],
78+
)
79+
return metric_data
80+
81+
82+
def publish_celery_metrics():
83+
print("starting task")
84+
queue_metrics = publish_queue_metrics(["default"])
85+
return queue_metrics

backend/apps/core/views.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from rest_framework import viewsets
77
from rest_framework.decorators import api_view
88

9+
from .utils.celery_utils import publish_celery_metrics
910
from apps.core.tasks import debug_task, send_test_email_task, sleep_task
1011

1112
r = settings.REDIS
@@ -48,6 +49,17 @@ def sleep_task_view(request):
4849
)
4950

5051

52+
@api_view(["POST"])
53+
def celery_metrics(request):
54+
if request.data.get("celery_metrics_token") == os.environ.get(
55+
"CELERY_METRICS_TOKEN"
56+
):
57+
published_celery_metrics = publish_celery_metrics()
58+
return JsonResponse(published_celery_metrics)
59+
else:
60+
return JsonResponse({"message": "Unauthorized"}, status=401)
61+
62+
5163
def send_test_email(request):
5264
send_test_email_task.delay()
5365
return JsonResponse({"message": "Success"})

0 commit comments

Comments
 (0)