Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions qiita_db/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,34 @@ def ebi_study_accession(self, value):
qdb.sql_connection.TRN.add(sql, [value, self.id])
qdb.sql_connection.TRN.execute()

def _ebi_submission_jobs(self):
"""Helper code to avoid duplication"""
plugin = qdb.software.Software.from_name_and_version(
'Qiita', 'alpha')
cmd = plugin.get_command('submit_to_EBI')

sql = """SELECT processing_job_id,
pj.command_parameters->>'artifact' as aid,
processing_job_status, can_be_submitted_to_ebi,
array_agg(ebi_run_accession)
FROM qiita.processing_job pj
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
LEFT JOIN qiita.artifact ON (
artifact_id = (
pj.command_parameters->>'artifact')::INT)
LEFT JOIN qiita.ebi_run_accession era USING (artifact_id)
LEFT JOIN qiita.artifact_type USING (artifact_type_id)
WHERE pj.command_parameters->>'artifact' IN (
SELECT artifact_id::text
FROM qiita.study_artifact WHERE study_id = {0})
AND pj.command_id = {1}
GROUP BY processing_job_id, aid, processing_job_status,
can_be_submitted_to_ebi""".format(self._id, cmd.id)
qdb.sql_connection.TRN.add(sql)

return qdb.sql_connection.TRN.execute_fetchindex()

@property
def ebi_submission_status(self):
"""The EBI submission status of this study
Expand All @@ -846,24 +874,11 @@ def ebi_submission_status(self):
if self.ebi_study_accession:
status = 'submitted'

plugin = qdb.software.Software.from_name_and_version(
'Qiita', 'alpha')
cmd = plugin.get_command('submit_to_EBI')

sql = """SELECT processing_job_id, command_parameters->>'artifact',
processing_job_status
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE command_parameters->>'artifact' IN (
SELECT artifact_id::text
FROM qiita.study_artifact
WHERE study_id = {0}) AND command_id = {1}""".format(
self._id, cmd.id)
qdb.sql_connection.TRN.add(sql)
jobs = defaultdict(dict)
for info in qdb.sql_connection.TRN.execute_fetchindex():
jid, aid, js = info
for info in self._ebi_submission_jobs():
jid, aid, js, cbste, era = info
if not cbste or era != [None]:
continue
jobs[js][aid] = jid

if 'queued' in jobs or 'running' in jobs:
Expand Down
21 changes: 21 additions & 0 deletions qiita_db/test/test_study.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,27 @@ def test_ebi_study_accession_setter(self):

def test_ebi_submission_status(self):
self.assertEqual(self.study.ebi_submission_status, 'submitted')

# let's test that even with a failed job nothing changes
# add a failed job for an artifact (2) that can be submitted
user = qdb.user.User('test@foo.bar')
qp = qdb.software.Software.from_name_and_version('Qiita', 'alpha')
cmd = qp.get_command('submit_to_EBI')
params = qdb.software.Parameters.load(cmd, values_dict={
'artifact': 2, 'submission_type': 'ADD'})
job = qdb.processing_job.ProcessingJob.create(user, params, True)
job._set_error('Killed by Admin')
# and just to be careful add a failed job for an artifact (1) that
# cannot be submitted
qp = qdb.software.Software.from_name_and_version('Qiita', 'alpha')
cmd = qp.get_command('submit_to_EBI')
params = qdb.software.Parameters.load(cmd, values_dict={
'artifact': 1, 'submission_type': 'ADD'})
job = qdb.processing_job.ProcessingJob.create(user, params, True)
job._set_error('Killed by Admin')
# should still return submited
self.assertEqual(self.study.ebi_submission_status, 'submitted')

new = qdb.study.Study.create(
qdb.user.User('test@foo.bar'),
'NOT Identification of the Microbiomes for Cannabis Soils 5',
Expand Down
7 changes: 0 additions & 7 deletions qiita_ware/ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,6 @@ def __init__(self, artifact_id, action):
LogEntry.create('Runtime', error_msg)
raise EBISubmissionError(error_msg)

status = self.study.ebi_submission_status
if status in self.valid_ebi_submission_states:
error_msg = ("Cannot perform parallel EBI submission for the same "
"study. Current status of the study: %s" % status)
LogEntry.create('Runtime', error_msg)
raise EBISubmissionError(error_msg)

self.artifact_id = artifact_id
self.study_title = self.study.title
self.study_abstract = self.study.info['study_abstract']
Expand Down
9 changes: 9 additions & 0 deletions qiita_ware/private_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from qiita_ware.commands import submit_VAMPS, submit_EBI
from qiita_ware.metadata_pipeline import (
create_templates_from_qiime_mapping_file)
from qiita_ware.exceptions import EBISubmissionError


def build_analysis_files(job):
Expand Down Expand Up @@ -98,6 +99,14 @@ def submit_to_EBI(job):
param_vals = job.parameters.values
artifact_id = int(param_vals['artifact'])
submission_type = param_vals['submission_type']
artifact = qdb.artifact.Artifact(artifact_id)

for info in artifact.study._ebi_submission_jobs():
jid, aid, js, cbste, era = info
if js in ('running', 'queued') and jid != job.id:
error_msg = ("Cannot perform parallel EBI submission for "
"the same study. Current job running: %s" % js)
raise EBISubmissionError(error_msg)
submit_EBI(artifact_id, submission_type, True)
job._set_status('success')

Expand Down
11 changes: 8 additions & 3 deletions qiita_ware/test/test_private_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from unittest import TestCase, main
from os.path import join, dirname, abspath, exists, isdir
from os import close, remove
from os import close, remove, environ

from shutil import rmtree
from tempfile import mkstemp
from json import loads, dumps
Expand Down Expand Up @@ -418,11 +419,15 @@ def test_submit_to_EBI(self):
artifact.study.ebi_submission_status)
checked_submitting = False
# once it fails wait for a few to check status again
sleep(10)
sleep(5)
exp = 'Some artifact submissions failed: %d' % artifact.id
obs = artifact.study.ebi_submission_status
self.assertEqual(obs, exp)

# make sure that the error is correct, we have 2 options
if environ.get('ASPERA_SCP_PASS', '') != '':
self.assertIn('1.SKM2.640199', job.log.msg)
else:
self.assertIn('ASCP Error:', job.log.msg)
# wait for everything to finish to avoid DB deadlocks
sleep(5)

Expand Down