-
Notifications
You must be signed in to change notification settings - Fork 202
/
cloudwatch.py
73 lines (58 loc) · 2.56 KB
/
cloudwatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
CloudwatchWrapper extracted partially from
<https://github.com/awsdocs/aws-doc-sdk-examples/blob/54c3b82d8f9a12a862f9fcec44909829bda849af/python/example_code/cloudwatch/cloudwatch_basics.py>
The CloudwatchWrapper requires the AWS_CLOUDWATCH_CONN_ID, or the `aws_default`
connection, to be set in the Airflow Connections.
Modifying alarms can be skipped by setting the `TOGGLE_CLOUDWATCH_ALARMS` to `False`
in the Airflow Variables, which is particularly the desired behavior when running
the Data Refresh DAGs locally or in a development environment.
"""
import logging
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from botocore.exceptions import ClientError
from common.constants import AWS_CLOUDWATCH_CONN_ID
logger = logging.getLogger(__name__)
class CloudWatchWrapper:
"""Encapsulates Amazon CloudWatch functions"""
def __init__(self, cloudwatch_resource):
""":param cloudwatch_resource: A Boto3 CloudWatch resource."""
self.cloudwatch_resource = cloudwatch_resource
def enable_alarm_actions(self, alarm_name, enable):
"""
Enable or disable actions on the specified alarm. Alarm actions can be
used to send notifications or automate responses when an alarm enters a
particular state.
:param alarm_name: The name of the alarm.
:param enable: When True, actions are enabled for the alarm. Otherwise, they
disabled.
"""
try:
alarm = self.cloudwatch_resource.Alarm(alarm_name)
if enable:
alarm.enable_actions()
else:
alarm.disable_actions()
logger.info(
"%s actions for alarm %s.",
"Enabled" if enable else "Disabled",
alarm_name,
)
except ClientError:
logger.exception(
"Couldn't %s actions alarm %s.",
"enable" if enable else "disable",
alarm_name,
)
raise
def enable_or_disable_alarms(enable):
toggle = Variable.get("TOGGLE_CLOUDWATCH_ALARMS", True, deserialize_json=True)
if not toggle:
raise AirflowSkipException("TOGGLE_CLOUDWATCH_ALARMS is set to False.")
cloudwatch = AwsBaseHook(
aws_conn_id=AWS_CLOUDWATCH_CONN_ID,
resource_type="cloudwatch",
)
cw_wrapper = CloudWatchWrapper(cloudwatch.get_conn())
cw_wrapper.enable_alarm_actions("ES Production CPU utilization above 50%", enable)