Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 113 additions & 16 deletions qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
from qiita_core.exceptions import IncompetentQiitaDeveloperError
from qiita_core.qiita_settings import qiita_config
import qiita_db as qdb
from qiita_db.archive import Archive
from json import loads, dump

from subprocess import Popen, PIPE


class Analysis(qdb.base.QiitaObject):
Expand Down Expand Up @@ -827,33 +831,41 @@ def build_files(self, merge_duplicated_sample_ids):
# make testing much harder as we will need to have analyses at
# different stages and possible errors.
samples = self.samples
# gettin the info of all the artifacts to save SQL time
# retrieving all info on artifacts to save SQL time
bioms_info = qdb.util.get_artifacts_information(samples.keys())

# figuring out if we are going to have duplicated samples, again
# doing it here cause it's computational cheaper
# doing it here cause it's computationally cheaper
# 1. merge samples per: data_type, reference used and
# the command id
# Note that grouped_samples is basically how many biom tables we
# are going to create
rename_dup_samples = False
grouped_samples = {}

addtl_processing_cmd = {}
# post_processing_cmds is a list of dictionaries, each describing
# an operation to be performed on the final merged BIOM. The order
# of operations will be list-order. Thus, in the case that
# multiple post_processing_cmds are implemented, ensure proper
# order before passing off to _build_biom_tables().
post_processing_cmds = []
for aid, asamples in viewitems(samples):
# find the artifat info, [0] there should be only 1 info
# find the artifact info, [0] there should be only one info
ainfo = [bi for bi in bioms_info
if bi['artifact_id'] == aid][0]

data_type = ainfo['data_type']

# ainfo['algorithm'] is the original merging scheme
label = "%s || %s" % (data_type, ainfo['algorithm'])
if label not in grouped_samples:
aparams = qdb.artifact.Artifact(aid).processing_parameters
if aparams is not None:
fam = aparams.command.addtl_processing_cmd
if fam is not None:
addtl_processing_cmd[label] = fam
cmd = aparams.command.post_processing_cmd
if cmd is not None:
# preserve label, in case it's needed.
cmd['label'] = label
post_processing_cmds.append(cmd)
grouped_samples[label] = []
grouped_samples[label].append((aid, asamples))
# 2. if rename_dup_samples is still False, make sure that we don't
Expand All @@ -876,23 +888,36 @@ def build_files(self, merge_duplicated_sample_ids):
dup_samples = dup_samples | s

self._build_mapping_file(samples, rename_dup_samples)
biom_files = self._build_biom_tables(
grouped_samples, rename_dup_samples)

# if values for addtl_processing_cmd were found, return them as
# well, if only to allow for testing.
if addtl_processing_cmd:
return (biom_files, addtl_processing_cmd)

if post_processing_cmds:
biom_files = self._build_biom_tables(
grouped_samples,
rename_dup_samples,
post_processing_cmds=post_processing_cmds,
archive_merging_scheme=ainfo['algorithm'])
else:
# preserve the legacy path
biom_files = self._build_biom_tables(
grouped_samples,
rename_dup_samples)

# if post_processing_cmds exists, biom_files will be a triplet,
# instead of a pair; the final element in the tuple will be an
# file path to the new phylogenetic tree.
return biom_files

def _build_biom_tables(self, grouped_samples, rename_dup_samples=False):
def _build_biom_tables(self,
grouped_samples,
rename_dup_samples=False,
post_processing_cmds=None,
archive_merging_scheme=None):
"""Build tables and add them to the analysis"""
with qdb.sql_connection.TRN:
base_fp = qdb.util.get_work_base_dir()

biom_files = []
for label, tables in viewitems(grouped_samples):

data_type, algorithm = [
l.strip() for l in label.split('||')]

Expand Down Expand Up @@ -958,7 +983,79 @@ def _build_biom_tables(self, grouped_samples, rename_dup_samples=False):
new_table.to_hdf5(
f, "Generated by Qiita, analysis id: %d, info: %s" % (
self._id, label))
biom_files.append((data_type, biom_fp))

# post_processing_cmds are a list of commands to run on the
# final BIOM. The order of operations is list-order. Each
# element of the list is a dictionary containing the Conda env
# to use, the script to run, and a dictionary of parameters.
if post_processing_cmds:
# assuming all commands require archives, obtain
# archives once, instead of for every cmd.
features = load_table(biom_fp).ids(axis='observation')
features = list(features)
archives = Archive.retrieve_feature_values(
archive_merging_scheme=archive_merging_scheme,
features=features)

# remove archives that SEPP could not match
archives = {f: loads(archives[f])
for f, plc
in archives.items()
if plc != ''}

# since biom_fp uses base_fp as its location, assume it's
# suitable for other files as well.
output_dir = base_fp

fp_archive = join(output_dir,
'archive_%d.json' % (self._id))

with open(fp_archive, 'w') as out_file:
dump(archives, out_file)

for cmd in post_processing_cmds:
# assume archives file is passed as:
# --fp_archive=<path_to_archives_file>
# assume output dir is passed as:
# --output_dir=<path_to_output_dir>

# concatenate any other parameters into a string
params = ' '.join(["%s=%s" % (k, v) for k, v in
cmd['script_params'].items()])

# append archives file and output dir parameters
params = "%s --fp_archive=%s --output_dir=%s" %\
(params, fp_archive, output_dir)

# if environment is successfully activated,
# run script with parameters
# script_env e.g.: 'deactivate; source activate qiita'
# script_path e.g.:
# python 'qiita_db/test/support_files/worker.py'
p = Popen(["%s; %s %s" %
(cmd['script_env'],
cmd['script_path'],
params)], shell=True, stdout=PIPE)

# p.communicate() waits on child to finish.
p_out, p_err = p.communicate()
p_out = p_out.decode("utf-8").rstrip()

# p_out will return either an error message or
# the file path to the new tree, depending on p's
# return code.
if p.returncode != 0:
raise IncompetentQiitaDeveloperError(p_out)

biom_files.append((
data_type,
biom_fp,
# instead of returning post-processing
# metadata(post_processing_cmds),
# return fp to phylogenetic tree.
p_out))
else:
biom_files.append((data_type, biom_fp))

# return the biom files, either with or without needed tree, to
# the user.
Expand Down
15 changes: 11 additions & 4 deletions qiita_db/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Command(qdb.base.QiitaObject):
Attributes
----------
active
addtl_processing_cmd
post_processing_cmd
analysis_only
default_parameter_sets
description
Expand Down Expand Up @@ -467,7 +467,7 @@ def name(self):
return qdb.sql_connection.TRN.execute_fetchlast()

@property
def addtl_processing_cmd(self):
def post_processing_cmd(self):
"""Additional processing commands required for merging

Returns
Expand All @@ -476,11 +476,18 @@ def addtl_processing_cmd(self):
Returns the additional processing command for merging
"""
with qdb.sql_connection.TRN:
sql = """SELECT addtl_processing_cmd
sql = """SELECT post_processing_cmd
FROM qiita.software_command
WHERE command_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()

cmd = qdb.sql_connection.TRN.execute_fetchlast()
if cmd:
# assume correctly formatted json data
# load data into dictionary; don't return JSON
return loads(qdb.sql_connection.TRN.execute_fetchlast())

return None

@property
def description(self):
Expand Down
8 changes: 4 additions & 4 deletions qiita_db/support_files/patches/67.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
-- October 6, 2018
-- add addtl_processing_cmd column to record additional information required to merge some BIOMS.
-- add post_processing_cmd column to record additional information required to merge some BIOMS.

ALTER TABLE qiita.software_command ADD addtl_processing_cmd varchar;

COMMENT ON COLUMN qiita.software_command.addtl_processing_cmd IS 'Store information on additional processes to merge BIOMs, if any.';
ALTER TABLE qiita.software_command ADD post_processing_cmd varchar;
COMMENT ON COLUMN qiita.software_command.post_processing_cmd IS 'Store information on additional post-processing steps for merged BIOMs, if any.';

-- October 25, 2018
-- add public_raw_download to study

ALTER TABLE qiita.study ADD public_raw_download bool default False;

6 changes: 3 additions & 3 deletions qiita_db/support_files/qiita-db.dbs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,9 +1307,9 @@
<column name="ignore_parent_command" type="bool" jt="-7" mandatory="y" >
<defo>false</defo>
</column>
<column name="addtl_processing_cmd" type="varchar" jt="12" >
<comment><![CDATA[Store information on additional processes to merge BIOMs, if any.]]></comment>
</column>
<column name="post_processing_cmd" type="varchar" jt="12">
<comment><![CDATA[Store information on additional post-processing steps for merged BIOMs, if any.]]></comment>
</column>
<index name="pk_soft_command" unique="PRIMARY_KEY" >
<column name="command_id" />
</index>
Expand Down
10 changes: 5 additions & 5 deletions qiita_db/support_files/qiita-db.html
Original file line number Diff line number Diff line change
Expand Up @@ -1911,8 +1911,8 @@
<use id='nn' x='2162' y='1332' xlink:href='#nn'/><a xlink:href='#software_command.active'><text x='2178' y='1342'>active</text><title>active bool not null default &#039;True&#039;</title></a>
<use id='nn' x='2162' y='1347' xlink:href='#nn'/><a xlink:href='#software_command.is_analysis'><text x='2178' y='1357'>is_analysis</text><title>is_analysis bool not null default &#039;False&#039;</title></a>
<use id='nn' x='2162' y='1362' xlink:href='#nn'/><a xlink:href='#software_command.ignore_parent_command'><text x='2178' y='1372'>ignore_parent_command</text><title>ignore_parent_command bool not null default false</title></a>
<a xlink:href='#software_command.addtl_processing_cmd'><text x='2178' y='1387'>addtl_processing_cmd</text><title>addtl_processing_cmd varchar
Store information on additional processes to merge BIOMs&#044; if any&#046;</title></a>
<use id='nn' x='2162' y='1377' xlink:href='#nn'/><a xlink:href='#software_command.post_processing_cmd'><text x='2178' y='1387'>post_processing_cmd</text><title>post_processing_cmd varchar
Store information on additional post-processing steps for merged BIOMs, if any.</title></a>

<!-- ============= Table 'processing_job_validator' ============= -->
<rect class='table' x='2160' y='1403' width='150' height='90' rx='7' ry='7' />
Expand Down Expand Up @@ -5726,10 +5726,10 @@
<td> </td>
</tr>
<tr>
<td>&nbsp;</td>
<td><a name='software_command.addtl_processing_cmd'>addtl&#095;processing&#095;cmd</a></td>
<td>*</td>
<td><a name='software_command.post_processing_cmd'>post&#095;processing&#095;cmd</a></td>
<td> varchar </td>
<td> Store information on additional processes to merge BIOMs&#044; if any&#046; </td>
<td>Store information on additional post-processing steps for merged BIOMs, if any.</td>
</tr>
<tr><th colspan='4'><b>Indexes</b></th></tr>
<tr> <td>Pk</td><td>pk&#095;soft&#095;command</td>
Expand Down
8 changes: 8 additions & 0 deletions qiita_db/test/support_files/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
import sys

# sample worker process prints the version information for the environment
# it is running in. Useful for confirming Popen is spawning processes with
# in the correct Python environment.
sys.stdout.write("Worker running Python %s\n" % str(sys.version_info))
sys.stdout.write(">>%s<<" % (sys.argv))
57 changes: 43 additions & 14 deletions qiita_db/test/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from qiita_core.testing import wait_for_processing_job
from qiita_core.qiita_settings import qiita_config
import qiita_db as qdb
from json import dumps
import ast

# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
Expand Down Expand Up @@ -515,36 +517,63 @@ def test_build_files(self):
'1.SKB8.640193', '1.SKB7.640196']
self.assertItemsEqual(biom_ids, exp)

def test_build_files_addtl_processing_cmd(self):
def test_build_files_post_processing_cmd(self):
tmp = qdb.artifact.Artifact(4).processing_parameters.command
cmd_id = tmp.id

# set a known artifact's additional processing command
# to a known value. Then test for it.
# qiita_db/test/support_files/worker.py will work w/py2.7 & 3.6 envs.
results = {}
results['script_env'] = 'source deactivate; source activate qiita'
results['script_path'] = 'qiita_db/test/support_files/worker.py'
results['script_params'] = {'a': 'A', 'b': 'B'}

# convert to json representation and store in PostgreSQL
results = dumps(results)

with qdb.sql_connection.TRN:
sql = """UPDATE qiita.software_command
SET addtl_processing_cmd = 'ls'
WHERE command_id = %s""" % cmd_id
qdb.sql_connection.TRN.add(sql, [cmd_id])
SET post_processing_cmd = %s
WHERE command_id = %s"""
qdb.sql_connection.TRN.add(sql, [results, cmd_id])
qdb.sql_connection.TRN.execute()

# create a sample analysis and run build_files on it.
analysis = self._create_analyses_with_samples()
results = npt.assert_warns(
qdb.exceptions.QiitaDBWarning, analysis.build_files, False)
post_processing_cmds = analysis.build_files(False)

# if build_files used additional processing commands, it will
# return a tuple, where the second element is a dictionary of
# the commands used.
# biom_tables = results[0]
addtl_processing_cmds = results[1]
for k in addtl_processing_cmds:
self.assertItemsEqual('ls', addtl_processing_cmds[k])
# return a tuple, where the third element contains output metadata.
for cmd in post_processing_cmds:
# each cmd in the list is a tuple
ppc, params = cmd[2].split('\n')

# since we are using the qiita env as our test env, assume major
# and minor info will remain constant at 2 and 7, respectively.
s = 'Worker running Python sys.version_info(major=2, minor=7,'
self.assertItemsEqual(ppc[:56], s)

# cleanup the second line of output from worker.py, containing
# the parameters passed to it.
params = params.lstrip('>>')
params = params.rstrip('<<')
params = ast.literal_eval(params)

self.assertItemsEqual(params[0],
'qiita_db/test/support_files/worker.py')
self.assertItemsEqual(params[1], 'a=A')
self.assertItemsEqual(params[2], 'b=B')

# for now, just compare the option names
self.assertItemsEqual(params[3][:13], '--fp_archive=')
self.assertItemsEqual(params[4][:13], '--output_dir=')

# cleanup (assume command was NULL previously)
with qdb.sql_connection.TRN:
sql = """UPDATE qiita.software_command
SET addtl_processing_cmd = NULL
WHERE command_id = %s""" % cmd_id
SET post_processing_cmd = NULL
WHERE command_id = %s"""
qdb.sql_connection.TRN.add(sql, [cmd_id])
qdb.sql_connection.TRN.execute()

Expand Down
Loading