Skip to content

Commit 88616b0

Browse files
committed
hook up broker connection failure
1 parent 046fb14 commit 88616b0

File tree

5 files changed

+90
-4
lines changed

5 files changed

+90
-4
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
# celery
55
celerybeat-schedule
6+
celerybeat_schedule.db
67

78
# pytest
89
.cache

celery_slack/attachments.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,30 @@ def get_beat_init_attachment(**kwargs):
302302
return attachment
303303

304304

305+
def get_broker_disconnection_attachment(**kwargs):
306+
"""Create the slack message attachment for broker disconnection."""
307+
if kwargs["show_celery_hostname"]:
308+
message = "*Celery could not connect to broker on {}.*".format(
309+
socket.gethostname()
310+
)
311+
else:
312+
message = "*Celery could not connect to broker.*"
313+
314+
attachment = {
315+
"attachments": [
316+
{
317+
"fallback": message,
318+
"color": kwargs["slack_broker_disconnect_color"],
319+
"text": message,
320+
"mrkdwn_in": ["text"]
321+
}
322+
],
323+
"text": ''
324+
}
325+
326+
return attachment
327+
328+
305329
def schedule_to_string(schedule):
306330
"""Transform a crontab, solar, or timedelta to a string."""
307331
if isinstance(schedule, crontab):

celery_slack/callbacks.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Celery state and task callbacks."""
22
from functools import wraps
3+
import time
34

45
from .attachments import get_beat_init_attachment
6+
from .attachments import get_broker_disconnection_attachment
57
from .attachments import get_celery_shutdown_attachment
68
from .attachments import get_celery_startup_attachment
79
from .attachments import get_task_failure_attachment
@@ -119,3 +121,48 @@ def slack_beat_init_callback(**kwargs):
119121
post_to_slack(cbkwargs["webhook"], ' ', attachment)
120122

121123
return slack_beat_init_callback
124+
125+
126+
# Prevent spam
127+
BROKER_FAILURE_COOLDOWN = 60
128+
BROKER_FAILURE_TIME = time.time() - BROKER_FAILURE_COOLDOWN
129+
130+
131+
def slack_broker_connection_failure(**cbkwargs):
132+
"""Wrap the kombu.connection.retry_over_time callback callable."""
133+
def slack_callback():
134+
global BROKER_FAILURE_TIME
135+
global BROKER_FAILURE_COOLDOWN
136+
137+
if time.time() - BROKER_FAILURE_TIME > BROKER_FAILURE_COOLDOWN:
138+
BROKER_FAILURE_TIME = time.time()
139+
attachment = get_broker_disconnection_attachment(**cbkwargs)
140+
post_to_slack(cbkwargs["webhook"], ' ', attachment)
141+
142+
def wrapper(func):
143+
144+
@wraps(func)
145+
def wrapped_func(fun, catch, args=[], kwargs={}, errback=None,
146+
max_retries=None, interval_start=2, interval_step=2,
147+
interval_max=30, callback=None):
148+
149+
def callback_wrapper(cb_func):
150+
@wraps(cb_func)
151+
def wrapped_cb_func(*args, **kwargs):
152+
slack_callback()
153+
return cb_func(*args, **kwargs)
154+
return wrapped_cb_func
155+
156+
if callback is not None:
157+
callback = callback_wrapper(callback)
158+
else:
159+
callback = slack_callback
160+
161+
func(fun=fun, catch=catch, args=args, kwargs=kwargs,
162+
errback=errback, max_retries=max_retries,
163+
interval_start=interval_start, interval_step=interval_step,
164+
interval_max=interval_max, callback=callback)
165+
166+
return wrapped_func
167+
168+
return wrapper

celery_slack/slackify.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
from celery.signals import celeryd_init
66
from celery.signals import task_prerun
77
from celery.signals import worker_shutdown
8+
import kombu
89

910
from .callbacks import slack_beat_init
11+
from .callbacks import slack_broker_connection_failure
1012
from .callbacks import slack_celery_shutdown
1113
from .callbacks import slack_celery_startup
1214
from .callbacks import slack_task_failure
@@ -24,6 +26,7 @@
2426
"slack_task_prerun_color": "#D3D3D3",
2527
"slack_task_success_color": "#36A64F",
2628
"slack_task_failure_color": "#D00001",
29+
"slack_broker_disconnect_color": "#D00001",
2730
"flower_base_url": None,
2831
"show_task_id": True,
2932
"show_task_execution_time": True,
@@ -36,6 +39,7 @@
3639
"show_startup": True,
3740
"show_shutdown": True,
3841
"show_beat": True,
42+
"show_broker": True,
3943
"use_fixed_width": True,
4044
"include_tasks": None,
4145
"exclude_tasks": None,
@@ -81,6 +85,7 @@ def __init__(self, app, webhook=None, beat_schedule=None, **options):
8185

8286
self._connect_signals()
8387
self._decorate_task_methods()
88+
self._decorate_kombu_retry()
8489

8590
def _connect_signals(self):
8691
"""Connect callbacks to celery signals.
@@ -119,3 +124,11 @@ def _decorate_task_methods(self):
119124
slack_task_success(**self.options)(self.app.Task.on_success)
120125
self.app.Task.on_failure = \
121126
slack_task_failure(**self.options)(self.app.Task.on_failure)
127+
128+
def _decorate_kombu_retry(self):
129+
"""Decorate the kombu.connection.retry_over_time function."""
130+
if self.options["show_broker"]:
131+
kombu.connection.retry_over_time = \
132+
slack_broker_connection_failure(**self.options)(
133+
kombu.connection.retry_over_time
134+
)

tests/celeryapp/celeryapp.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
"flower_base_url": "https://flower.example.com",
2626
"webhook": slack_webhook,
2727
"beat_schedule": schedule,
28-
"show_task_prerun": True,
28+
# "show_task_prerun": True,
2929
# "failures_only": True,
30-
# "show_startup": False,
31-
# "show_shutdown": False,
32-
# "show_beat": False,
30+
# "show_celery_hostname": True,
31+
"show_startup": False,
32+
"show_shutdown": False,
33+
"show_beat": False,
3334
}
3435

3536
# logging.info('Creating celery-slack object.')

0 commit comments

Comments
 (0)