Skip to content

Commit 865f6ba

Browse files
authored
Merge pull request crflynn#5 from crflynn/broker_connection
Resolves crflynn#4
2 parents 046fb14 + 541a848 commit 865f6ba

14 files changed

+758
-365
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
# celery
55
celerybeat-schedule
6+
celerybeat-schedule.db
67

78
# pytest
89
.cache

Pipfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ name = "pypi"
77

88
[packages]
99

10-
celery = "*"
1110
requests = "*"
1211
ephem = "*"
12+
celery = "*"
1313

1414

1515
[dev-packages]

Pipfile.lock

Lines changed: 201 additions & 196 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

celery_slack/attachments.py

Lines changed: 119 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
import socket
66
import time
77

8+
from billiard.process import current_process
89
from celery import __version__ as CELERY_VERSION
910
from celery.schedules import crontab
1011

11-
if CELERY_VERSION >= '4.0.0':
12+
if CELERY_VERSION >= "4.0.0":
1213
from celery.schedules import solar
1314

1415

1516
STOPWATCH = {}
16-
BEAT_DELIMITER = ' -> '
17+
BEAT_DELIMITER = " -> "
1718

1819

1920
def add_task_to_stopwatch(task_id):
@@ -23,25 +24,25 @@ def add_task_to_stopwatch(task_id):
2324

2425
def get_task_prerun_attachment(task_id, task, args, kwargs, **cbkwargs):
2526
"""Create the slack message attachment for a task prerun."""
26-
message = 'Executing -- ' + task.name.rsplit('.', 1)[-1]
27+
message = "Executing -- " + task.name.rsplit(".", 1)[-1]
2728

28-
lines = ['Name: *' + task.name + '*']
29+
lines = ["Name: *" + task.name + "*"]
2930

3031
if cbkwargs["show_task_id"]:
31-
lines.append('Task ID: ' + task_id)
32+
lines.append("Task ID: " + task_id)
3233

3334
if cbkwargs["use_fixed_width"]:
3435
if cbkwargs["show_task_args"]:
35-
lines.append('args: ' + '`' + str(args) + '`')
36+
lines.append("args: " + "`" + str(args) + "`")
3637
if cbkwargs["show_task_kwargs"]:
37-
lines.append('kwargs: ' + '`' + str(kwargs) + '`')
38+
lines.append("kwargs: " + "`" + str(kwargs) + "`")
3839
else:
3940
if cbkwargs["show_task_args"]:
40-
lines.append('args: ' + str(args))
41+
lines.append("args: " + str(args))
4142
if cbkwargs["show_task_kwargs"]:
42-
lines.append('kwargs: ' + str(kwargs))
43+
lines.append("kwargs: " + str(kwargs))
4344

44-
executing = '\n'.join(lines)
45+
executing = "\n".join(lines)
4546

4647
attachment = {
4748
"attachments": [
@@ -53,13 +54,13 @@ def get_task_prerun_attachment(task_id, task, args, kwargs, **cbkwargs):
5354
"mrkdwn_in": ["text"]
5455
}
5556
],
56-
"text": ''
57+
"text": ""
5758
}
5859

5960
if cbkwargs["flower_base_url"]:
6061
attachment["attachments"][0]["title_link"] = (
6162
cbkwargs["flower_base_url"] +
62-
'/task/{tid}'.format(tid=task_id)
63+
"/task/{tid}".format(tid=task_id)
6364
)
6465

6566
return attachment
@@ -84,36 +85,36 @@ def get_task_success_attachment(task_name, retval, task_id,
8485
else:
8586
retval = str(retval)
8687

87-
message = 'SUCCESS -- ' + task_name.rsplit('.', 1)[-1]
88+
message = "SUCCESS -- " + task_name.rsplit(".", 1)[-1]
8889

8990
elapsed = divmod(time.time() - STOPWATCH.pop(task_id), 60)
9091

91-
lines = ['Name: *' + task_name + '*']
92+
lines = ["Name: *" + task_name + "*"]
9293

9394
if cbkwargs["show_task_execution_time"]:
94-
lines.append('Execution time: {m} minutes {s} seconds'.format(
95+
lines.append("Execution time: {m} minutes {s} seconds".format(
9596
m=str(int(elapsed[0])),
9697
s=str(int(elapsed[1])),
9798
))
9899
if cbkwargs["show_task_id"]:
99-
lines.append('Task ID: ' + task_id)
100+
lines.append("Task ID: " + task_id)
100101

101102
if cbkwargs["use_fixed_width"]:
102103
if cbkwargs["show_task_args"]:
103-
lines.append('args: ' + '`' + str(args) + '`')
104+
lines.append("args: " + "`" + str(args) + "`")
104105
if cbkwargs["show_task_kwargs"]:
105-
lines.append('kwargs: ' + '`' + str(kwargs) + '`')
106+
lines.append("kwargs: " + "`" + str(kwargs) + "`")
106107
if cbkwargs["show_task_return_value"]:
107-
lines.append('Returned: ' + '```' + str(retval) + '```')
108+
lines.append("Returned: " + "```" + str(retval) + "```")
108109
else:
109110
if cbkwargs["show_task_args"]:
110-
lines.append('args: ' + str(args))
111+
lines.append("args: " + str(args))
111112
if cbkwargs["show_task_kwargs"]:
112-
lines.append('kwargs: ' + str(kwargs))
113+
lines.append("kwargs: " + str(kwargs))
113114
if cbkwargs["show_task_return_value"]:
114-
lines.append('Returned: ' + str(retval))
115+
lines.append("Returned: " + str(retval))
115116

116-
success = '\n'.join(lines)
117+
success = "\n".join(lines)
117118

118119
attachment = {
119120
"attachments": [
@@ -125,13 +126,13 @@ def get_task_success_attachment(task_name, retval, task_id,
125126
"mrkdwn_in": ["text"]
126127
}
127128
],
128-
"text": ''
129+
"text": ""
129130
}
130131

131132
if cbkwargs["flower_base_url"]:
132133
attachment["attachments"][0]["title_link"] = (
133134
cbkwargs["flower_base_url"] +
134-
'/task/{tid}'.format(tid=task_id)
135+
"/task/{tid}".format(tid=task_id)
135136
)
136137

137138
return attachment
@@ -151,38 +152,38 @@ def get_task_failure_attachment(task_name, exc, task_id, args,
151152
STOPWATCH.pop(task_id)
152153
return
153154

154-
message = 'FAILURE -- ' + task_name.rsplit('.', 1)[-1]
155+
message = "FAILURE -- " + task_name.rsplit(".", 1)[-1]
155156

156157
elapsed = divmod(time.time() - STOPWATCH.pop(task_id), 60)
157158

158-
lines = ['Name: *' + task_name + '*']
159+
lines = ["Name: *" + task_name + "*"]
159160

160161
if cbkwargs["show_task_execution_time"]:
161-
lines.append('Execution time: {m} minutes {s} seconds'.format(
162+
lines.append("Execution time: {m} minutes {s} seconds".format(
162163
m=str(int(elapsed[0])),
163164
s=str(int(elapsed[1])),
164165
))
165166
if cbkwargs["show_task_id"]:
166-
lines.append('Task ID: ' + task_id)
167+
lines.append("Task ID: " + task_id)
167168

168169
if cbkwargs["use_fixed_width"]:
169170
if cbkwargs["show_task_args"]:
170-
lines.append('args: ' + '`' + str(args) + '`')
171+
lines.append("args: " + "`" + str(args) + "`")
171172
if cbkwargs["show_task_kwargs"]:
172-
lines.append('kwargs: ' + '`' + str(kwargs) + '`')
173-
lines.append('Exception: ' + '`' + str(exc) + '`')
173+
lines.append("kwargs: " + "`" + str(kwargs) + "`")
174+
lines.append("Exception: " + "`" + str(exc) + "`")
174175
if cbkwargs["show_task_exception_info"]:
175-
lines.append('Info: ' + '```' + str(einfo) + '```')
176+
lines.append("Info: " + "```" + str(einfo) + "```")
176177
else:
177178
if cbkwargs["show_task_args"]:
178-
lines.append('args: ' + str(args))
179+
lines.append("args: " + str(args))
179180
if cbkwargs["show_task_kwargs"]:
180-
lines.append('kwargs: ' + str(kwargs))
181-
lines.append('Exception: ' + str(exc))
181+
lines.append("kwargs: " + str(kwargs))
182+
lines.append("Exception: " + str(exc))
182183
if cbkwargs["show_task_exception_info"]:
183-
lines.append('Info: ' + str(einfo))
184+
lines.append("Info: " + str(einfo))
184185

185-
failure = '\n'.join(lines)
186+
failure = "\n".join(lines)
186187

187188
attachment = {
188189
"attachments": [
@@ -194,13 +195,13 @@ def get_task_failure_attachment(task_name, exc, task_id, args,
194195
"mrkdwn_in": ["text"]
195196
}
196197
],
197-
"text": ''
198+
"text": ""
198199
}
199200

200201
if cbkwargs["flower_base_url"]:
201202
attachment["attachments"][0]["title_link"] = (
202203
cbkwargs["flower_base_url"] +
203-
'/task/{tid}'.format(tid=task_id)
204+
"/task/{tid}".format(tid=task_id)
204205
)
205206

206207
return attachment
@@ -224,7 +225,7 @@ def get_celery_startup_attachment(**kwargs):
224225
"mrkdwn_in": ["text"]
225226
}
226227
],
227-
"text": ''
228+
"text": ""
228229
}
229230

230231
return attachment
@@ -248,7 +249,7 @@ def get_celery_shutdown_attachment(**kwargs):
248249
"mrkdwn_in": ["text"]
249250
}
250251
],
251-
"text": ''
252+
"text": ""
252253
}
253254

254255
return attachment
@@ -265,27 +266,27 @@ def get_beat_init_attachment(**kwargs):
265266

266267
beat_schedule = kwargs["beat_schedule"]
267268
if beat_schedule:
268-
message += ' *with schedule:*'
269+
message += " *with schedule:*"
269270

270271
sched = []
271272
for task in sorted(beat_schedule):
272273
if kwargs["beat_show_full_task_path"]:
273274
sched.append(
274275
task + BEAT_DELIMITER +
275-
schedule_to_string(beat_schedule[task]['schedule']))
276+
schedule_to_string(beat_schedule[task]["schedule"]))
276277
else:
277278
sched.append(
278-
task.split('.', 2)[-1] + BEAT_DELIMITER +
279-
schedule_to_string(beat_schedule[task]['schedule']))
279+
task.split(".", 2)[-1] + BEAT_DELIMITER +
280+
schedule_to_string(beat_schedule[task]["schedule"]))
280281
schedule = (
281-
'\n*Task{}crontab(m/h/d/dM/MY) OR solar OR interval:*\n'.format(
282+
"\n*Task{}crontab(m/h/d/dM/MY) OR solar OR interval:*\n".format(
282283
BEAT_DELIMITER
283284
) +
284-
'```' + '\n'.join(sched) + '```'
285+
"```" + "\n".join(sched) + "```"
285286
)
286287
else:
287-
message = message[:-1] + '.' + message[-1:]
288-
schedule = ''
288+
message = message[:-1] + "." + message[-1:]
289+
schedule = ""
289290

290291
attachment = {
291292
"attachments": [
@@ -296,7 +297,71 @@ def get_beat_init_attachment(**kwargs):
296297
"mrkdwn_in": ["text"]
297298
}
298299
],
299-
"text": ''
300+
"text": ""
301+
}
302+
303+
return attachment
304+
305+
306+
processes = {
307+
"MainProcess": "Celery",
308+
"Beat": "Beat"
309+
}
310+
311+
312+
def get_broker_disconnect_attachment(**kwargs):
313+
"""Create the slack message attachment for broker disconnection."""
314+
if kwargs["show_celery_hostname"]:
315+
message = "*{process} could not connect to broker on {host}.*".format(
316+
process=processes.get(
317+
current_process()._name, current_process()._name),
318+
host=socket.gethostname()
319+
)
320+
else:
321+
message = "*{process} could not connect to broker.*".format(
322+
process=processes.get(
323+
current_process()._name, current_process()._name),
324+
)
325+
326+
attachment = {
327+
"attachments": [
328+
{
329+
"fallback": message,
330+
"color": kwargs["slack_broker_disconnect_color"],
331+
"text": message,
332+
"mrkdwn_in": ["text"]
333+
}
334+
],
335+
"text": ""
336+
}
337+
338+
return attachment
339+
340+
341+
def get_broker_connect_attachment(**kwargs):
342+
"""Create the slack message attachment for broker connection."""
343+
if kwargs["show_celery_hostname"]:
344+
message = "*{process} (re)connected to broker on {host}.*".format(
345+
process=processes.get(
346+
current_process()._name, current_process()._name),
347+
host=socket.gethostname()
348+
)
349+
else:
350+
message = "*{process} (re)connected to broker.*".format(
351+
process=processes.get(
352+
current_process()._name, current_process()._name),
353+
)
354+
355+
attachment = {
356+
"attachments": [
357+
{
358+
"fallback": message,
359+
"color": kwargs["slack_broker_connect_color"],
360+
"text": message,
361+
"mrkdwn_in": ["text"]
362+
}
363+
],
364+
"text": ""
300365
}
301366

302367
return attachment
@@ -306,9 +371,9 @@ def schedule_to_string(schedule):
306371
"""Transform a crontab, solar, or timedelta to a string."""
307372
if isinstance(schedule, crontab):
308373
return str(schedule)[10:-15]
309-
elif CELERY_VERSION >= '4.0.0' and isinstance(schedule, solar):
374+
elif CELERY_VERSION >= "4.0.0" and isinstance(schedule, solar):
310375
return str(schedule)[8:-1]
311376
elif isinstance(schedule, timedelta):
312-
return 'every ' + str(schedule)
377+
return "every " + str(schedule)
313378
else:
314-
return 'every ' + str(schedule) + ' seconds'
379+
return "every " + str(schedule) + " seconds"

0 commit comments

Comments
 (0)