Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 101 additions & 17 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def create(cls, user, parameters, force=False):
if vals[0] == 'artifact':
artifact_info = parameters.values[pname]
# If the artifact_info is a list, then the artifact
# still doesn't exists because the current job is part
# still doesn't exist because the current job is part
# of a workflow, so we can't link
if not isinstance(artifact_info, list):
TTRN.add(sql, [artifact_info, job_id])
Expand Down Expand Up @@ -703,6 +703,101 @@ def status(self):
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()

def _generate_notification_message(self, value, error_msg):
ignored_software = ('artifact definition',)
ignored_commands = ('Validate', 'complete_job', 'release_validators')

# abort early conditions (don't send an email notification)
# tentatively accept the overhead of a function-call, even when a
# notification isn't sent, just to keep the logic clean and
# centralized.

if value == 'waiting':
# notification not needed.
return None

if not self.user.info['receive_processing_job_emails']:
# notification not needed.
return None

if self.command.software.name in ignored_software:
# notification not needed.
return None

if self.command.name in ignored_commands:
# notification not needed.
return None

# generate subject line
subject = 'Job status change: %s (%s)' % (self.command.name, self.id)

# generate message line
message = ''

input_artifacts = self.input_artifacts
if input_artifacts is None:
# this is an admin job. display command name and parameters
message = (f'Admin Job {self.command.name} '
f'{self.command.parameters}')
else:
for artifact in input_artifacts:
if artifact.prep_templates is not None:
# this is a processing job. display the study id as link,
# prep ids, data_type, and command name.
study_ids = [x.study_id for x in artifact.prep_templates]
prep_ids = [x.id for x in artifact.prep_templates]
data_types = [x.data_type() for x in
artifact.prep_templates]

# there should only be one study id
study_ids = set(study_ids)
if len(study_ids) > 1:
raise qdb.exceptions.QiitaError("More than one Study "
"ID was found: "
f"{study_ids}")
study_id = study_ids.pop()

# there should be at least one prep_id and probably more.
prep_ids = set(prep_ids)
if len(prep_ids) == 0:
raise qdb.exceptions.QiitaError("No Prep IDs were "
"found")
# convert into a string for presentation.
prep_ids = [str(x) for x in prep_ids]
prep_ids = ', '.join(prep_ids)

# there should be only one data type.
data_types = set(data_types)
if len(data_types) > 1:
raise qdb.exceptions.QiitaError("More than one data "
"type was found: "
f"{data_types}")
data_type = data_types.pop()

message = f'Processing Job: {self.command.name}\n'
message += 'Study <A HREF="https://qiita.ucsd.edu/study/'
message += f'description/{study_id}">{study_id}'
message += '</A>\n'
message += f'Prep IDs: {prep_ids}\n'
message += f'Data Type: {data_type}\n'
elif artifact.analysis is not None:
# this is an analysis job. display analysis id as link and
# the command name.
message = f'Analysis Job {self.command.name} '
message += '<A HREF="https://qiita.ucsd.edu/analysis/'
message += f'description/{artifact.analysis.id}">'
message += f'{artifact.analysis.id}</A>\n'
else:
raise qdb.exceptions.QiitaError("Unknown Condition")

# append legacy message line
message += 'New status: %s' % (value)

if value == 'error' and error_msg is not None:
message += f'\n\nError:\n{error_msg}'

return {'subject': subject, 'message': message}

def _set_status(self, value, error_msg=None):
"""Sets the status of the job

Expand Down Expand Up @@ -734,22 +829,11 @@ def _set_status(self, value, error_msg=None):
new_status = qdb.util.convert_to_id(
value, "processing_job_status")

if value not in {'waiting'}:
if self.user.info['receive_processing_job_emails']:
# skip if software is artifact definition
ignore_software = ('artifact definition', )
if self.command.software.name not in ignore_software:
ignore_commands = ('Validate', 'complete_job',
'release_validators')
if self.command.name not in ignore_commands:
subject = 'Job status change: %s (%s)' % (
self.command.name, self.id)
message = 'New status: %s' % (value)

if value == 'error' and error_msg is not None:
message += f'\n\nError:\n{error_msg}'
qdb.util.send_email(
self.user.email, subject, message)
msg = self._generate_notification_message(value, error_msg)
if msg is not None:
# send email
qdb.util.send_email(self.user.email, msg['subject'],
msg['message'])

sql = """UPDATE qiita.processing_job
SET processing_job_status_id = %s
Expand Down
93 changes: 93 additions & 0 deletions qiita_db/test/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def setUp(self):
"b72369f9-a886-4193-8d3d-f7b504168e75")
self.tester4 = qdb.processing_job.ProcessingJob(
"d19f76ee-274e-4c1b-b3a2-a12d73507c55")

self._clean_up_files = []

def _get_all_job_ids(self):
Expand Down Expand Up @@ -844,6 +845,98 @@ def _set_allocation(memory):
self.assertEqual(job_changed.get_resource_allocation_info(),
'-p qiita --mem 2G')

def test_notification_mail_generation(self):
# Almost all processing-jobs in testing are owned by test@foo.bar
# and are of type 'Split libraries FASTQ'.

# as 'test@foo.bar' is not set to receive notifications, let's
# first manually set their configuration to 'true'.
sql = ("UPDATE qiita.qiita_user SET receive_processing_job_emails"
" = true WHERE email = 'test@foo.bar'")

with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)

# with or w/out an error message, a status of 'waiting' should
# immediately return with a 'None' message.
obs = self.tester1._generate_notification_message('waiting', None)
self.assertEqual(obs, None)
obs = self.tester1._generate_notification_message('waiting',
'Hello World')
self.assertEqual(obs, None)

# An error message in the parameter should show a difference for
# messages of type 'error'.
obs = self.tester1._generate_notification_message('error', None)

exp = {'subject': ('Job status change: Split libraries FASTQ '
'(063e553b-327c-4818-ab4a-adfe58e49860)'),
'message': ('Processing Job: Split libraries FASTQ\nStudy '
'<A HREF="https://qiita.ucsd.edu/study/description'
'/1">1</A>\nPrep IDs: 1\nData Type: 18S\nNew '
'status: error')}

self.assertDictEqual(obs, exp)

obs = self.tester1._generate_notification_message('error',
'An Error Message')

exp = {'subject': ('Job status change: Split libraries FASTQ '
'(063e553b-327c-4818-ab4a-adfe58e49860)'),
'message': ('Processing Job: Split libraries FASTQ\nStudy '
'<A HREF="https://qiita.ucsd.edu/study/description'
'/1">1</A>\nPrep IDs: 1\nData Type: 18S\nNew status'
': error\n\nError:\nAn Error Message')}

self.assertDictEqual(obs, exp)

# The inclusion of an error message has no effect on other valid
# status types e.g. 'running'.
obs = self.tester1._generate_notification_message('running', None)

exp = {'subject': ('Job status change: Split libraries FASTQ '
'(063e553b-327c-4818-ab4a-adfe58e49860)'),
'message': ('Processing Job: Split libraries FASTQ\nStudy '
'<A HREF="https://qiita.ucsd.edu/study/description'
'/1">1</A>\nPrep IDs: 1\nData Type: 18S\nNew status'
': running')}

self.assertDictEqual(obs, exp)

obs = self.tester1._generate_notification_message('running', 'Yahoo!')

exp = {'subject': ('Job status change: Split libraries FASTQ '
'(063e553b-327c-4818-ab4a-adfe58e49860)'),
'message': ('Processing Job: Split libraries FASTQ\nStudy '
'<A HREF="https://qiita.ucsd.edu/study/description'
'/1">1</A>\nPrep IDs: 1\nData Type: 18S\nNew status'
': running')}

self.assertDictEqual(obs, exp)

# as 'test@foo.bar' is not set to receive notifications, let's
# first manually set their configuration to 'true'.
# reset test@foo.bar to 'false' to test expectations for a non-
# privileged user.
sql = ("UPDATE qiita.qiita_user SET receive_processing_job_emails"
" = false WHERE email = 'test@foo.bar'")

with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)

# waiting should still return w/out a message.
obs = self.tester1._generate_notification_message('waiting', None)
self.assertEqual(obs, None)

# an error status should now return nothing.
obs = self.tester1._generate_notification_message('error',
'An Error Message')
self.assertEqual(obs, None)

# other status messages should also return nothing.
obs = self.tester1._generate_notification_message('running', None)
self.assertEqual(obs, None)


@qiita_test_checker()
class ProcessingWorkflowTests(TestCase):
Expand Down