Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ install:
'pandas>=0.18' 'matplotlib>=1.1.0' 'scipy>0.13.0' 'numpy>=1.7' 'h5py>=2.3.1'
- source activate qiita
- pip install -U pip
- pip install sphinx sphinx-bootstrap-theme nose-timer codecov
- pip install sphinx sphinx-bootstrap-theme nose-timer codecov 'Click==6.7'
- 'echo "backend: Agg" > matplotlibrc'
- git clone https://github.com/nicolasff/webdis
- pushd webdis
Expand Down
38 changes: 24 additions & 14 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,30 @@ For more specific details about qiita visit `the Qiita main site tutorial <https
Current features
----------------

* Create a study, add a sample information file with the description (metadata,
clinical, geographical, etc) of each sample and multiple preparation
information files, how the samples were prepared in the wet lab prior to data
generation.
* Update sample information by modifying values in the existing columns, via
an upload and processing of a new information file. Updates include being able
to change values in existing columns or add new ones. It is also possible to
add samples to an existing sample information sheet. We currently don't
support deletions.
* Change the status of a study: Sandboxed -> Private -> Public.
* Long term sequence data deposition to the European Nucleotide Archive (ENA),
part of the European Bioinformatics Institute (EBI).
* Search over existing studies (see known issues).
* Generate basic visualizations with the available studies and datasets.
* Full study management: Create, delete, update samples in the sample and
multiple preparation information files.
* Upload files via direct drag & drop from the web interface or via scp
from any server that allows these connections.
* Study privacy management: Sandboxed -> Private -> Public.
* Easy long-term sequence data deposition to the European Nucleotide Archive (ENA),
part of the European Bioinformatics Institute (EBI) for private and public
studies.
* Raw data processing for:

* Target gene data: we support deblur against GreenGenes (13_8) and close
reference picking against GreenGenes (13_8) and Silva.
* Metagenoic/Shotgun data: we support Shogun processing. Note that this data
is suitable for download and further down analyses but we don't recommend
analysis within Qiita.
* biom files can be added as new preparation templates for downstream
analyses; however, this cannot be made public.

* Basic downstream analyses using Qiime2.
* Basic study search in the study listing page.
* Complex metadata search via redbiom.

For more detailed information visit the `Qiita tutorial <https://cmi-workshop.readthedocs.io/en/latest/>`__
and the `Qiita help <https://qiita.ucsd.edu/static/doc/html/index.html>`__.

Accepted raw files
------------------
Expand Down
113 changes: 77 additions & 36 deletions scripts/qiita-recover-jobs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,31 @@ from subprocess import check_output
from qiita_db.sql_connection import TRN
from qiita_db.processing_job import ProcessingJob
from time import sleep
from math import ceil


SLEEP_TIME = 2
SLEEP_TIME = 6
CHANCES = 3
SQL = """SELECT processing_job_id
FROM qiita.processing_job
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE processing_job_status = %s"""


def _submit_jobs(jids_to_recover, recover_type):
len_jids_to_recover = len(jids_to_recover) - 1
# we are gonna split the SLEEP_TIME by CHANCES so we can ctrl-c
# ... just in case
st = int(ceil(SLEEP_TIME/CHANCES))
len_jids_to_recover = len(jids_to_recover)
for i, j in enumerate(jids_to_recover):
print 'recovering %s: %d/%d' % (recover_type, len_jids_to_recover, i)
job = ProcessingJob(j)
job._set_status('in_construction')
job.submit()
sleep(SLEEP_TIME)
for i in range(CHANCES):
print 'You can ctrl-c now, iteration %d' % i
sleep(st)


def _retrieve_queue_jobs():
Expand All @@ -47,51 +59,78 @@ def _retrieve_queue_jobs():
return set(qiita_jids)


def _get_jids_to_recover(recover_type):
with TRN:
TRN.add(SQL, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = list(jids - _retrieve_queue_jobs())
print 'Total %s: %d' % (recover_type, len(jids_to_recover))
return jids_to_recover


def _flush_queues(recover_type):
# README: in theory we should be able to submit all recover_type jobs
# one after the other but in reality that's not possible. The issue
# is that a job is going to stay as running/waiting until is completed.
# Thus, we need to run complete_job first, wait for everything to finish,
# then continue with validate, then release_validators, and
# finally everything else. Note that is suggested to wait for the
# full recovery type to finish before moving to the next one

# first start with completing jobs that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
complete_job = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'complete_job']
_submit_jobs(complete_job, recover_type + '/complete_job')

# first start validators that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
validate = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'Validate']
_submit_jobs(validate, recover_type + '/validate')

# then the release validator
jids_to_recover = _get_jids_to_recover(recover_type)
release_validators = [
j for j in jids_to_recover
if ProcessingJob(j).command.name == 'release_validators']
_submit_jobs(release_validators, recover_type + '/release_validators')


def qiita_recover_jobs():
sql = """SELECT processing_job_id
FROM qiita.processing_job
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE processing_job_status = %s"""
# general full processing pipeline, as an example a deblur job as it yields
# two artifacts, each new line represents a new job, each idented block a
# waiting job
# -> deblur
# -> complete_job -> release_validator
# -> validate biom 1
# -> release_validator
# -> complete_job -> create artifact
# -> validate biom 2
# -> release_validator
# -> complete_job -> create artifact

# Step 1: recover jobs that are in queue status
with TRN:
recover_type = 'queued'
TRN.add(sql, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = jids - _retrieve_queue_jobs()
recover_type = 'queued'
_flush_queues(recover_type)

_submit_jobs(jids_to_recover, recover_type)
# then we recover what's left
jids_to_recover = _get_jids_to_recover(recover_type)
_submit_jobs(jids_to_recover, recover_type)

# Step 2: recover jobs that are running, note that there are several steps
# to recover this group: 2.1. check if they have validators,
# 2.2. if so, recover validators, 2. recover failed jobs
with TRN:
recover_type = 'running'
TRN.add(sql, [recover_type])
jids = set(TRN.execute_fetchflatten())
qiita_jids = _retrieve_queue_jobs()
jids_to_recover = jids - qiita_jids

# first start validators that are not running
validate = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'Validate']
_submit_jobs(validate, recover_type + '/validate')

# then the release validator
release_validators = [
j for j in jids_to_recover
if ProcessingJob(j).command.name == 'release_validators']
_submit_jobs(release_validators, recover_type + '/release_validators')

jids_to_recover = (jids_to_recover - set(validate) -
set(release_validators))
_flush_queues(recover_type)
jids_to_recover = _get_jids_to_recover(recover_type)

# 3.1, and 3.2: checking which jobs have validators, and recover them
jobs_with_validators = []
for j in jids_to_recover:
job = ProcessingJob(j)
validators = job.validator_jobs
validators = list(job.validator_jobs)
if not validators:
jobs_with_validators.append(j)
continue
Expand All @@ -101,7 +140,7 @@ def qiita_recover_jobs():
for vj in validators:
jobs_with_validators.append(vj.id)
status = set([v.status for v in validators
if v.id not in qiita_jids])
if v.id not in _retrieve_queue_jobs()])
# if there are no status, that means that the validators weren't
# created and we should rerun from scratch (Step 4)
if not bool(status):
Expand Down Expand Up @@ -135,7 +174,7 @@ def qiita_recover_jobs():
print ("Check the status of this job %s : %s and validators"
"%s." % (j, status, validators))

jids_to_recover = jids_to_recover - set(jobs_with_validators)
jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)

# Step 3: Finally, we recover all the leftover jobs
for i, j in enumerate(jids_to_recover):
Expand All @@ -151,4 +190,6 @@ def qiita_recover_jobs():


if __name__ == '__main__':
qiita_recover_jobs()
raise ValueError('This script should never be called directly but should '
'be used as a reference if we need to recover jobs, '
'see: qiita_recover_jobs')