-
Notifications
You must be signed in to change notification settings - Fork 79
adding tgz #1718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding tgz #1718
Changes from 5 commits
94017de
8664d43
2d21661
684c503
1057a5e
6d8674a
ace249a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| -- Mar 28, 2016 | ||
|
|
||
| INSERT INTO qiita.filepath_type (filepath_type) VALUES ('tgz') |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
|
|
||
| from subprocess import Popen, PIPE | ||
| from os.path import exists, join | ||
|
|
||
| from qiita_db.sql_connection import TRN | ||
| from qiita_db.artifact import Artifact | ||
| from qiita_db.util import (insert_filepaths, convert_to_id, get_mountpoint, | ||
| get_mountpoint_path_by_id) | ||
|
|
||
|
|
||
| tgz_id = convert_to_id("tgz", "filepath_type") | ||
| _, analysis_mp = get_mountpoint('analysis')[0] | ||
|
|
||
| with TRN: | ||
| # | ||
| # Generating compressed files for picking failures -- artifact_type = BIOM | ||
| # | ||
| sql = """SELECT artifact_id FROM qiita.artifact | ||
| JOIN qiita.artifact_type USING (artifact_type_id) | ||
| WHERE artifact_type = 'BIOM'""" | ||
| TRN.add(sql) | ||
|
|
||
| for r in TRN.execute_fetchindex(): | ||
| to_tgz = None | ||
| a = Artifact(r[0]) | ||
| for _, fp, fp_type in a.filepaths: | ||
| if fp_type == 'directory': | ||
| # removing / from the path if it exists | ||
| to_tgz = fp[:-1] if fp[-1] == '/' else fp | ||
| break | ||
|
|
||
| if to_tgz is None: | ||
| continue | ||
|
|
||
| tgz = to_tgz + '.tgz' | ||
| if not exists(tgz): | ||
| cmd = 'tar zcf %s %s' % (tgz, to_tgz) | ||
|
||
| proc = Popen(cmd, universal_newlines=True, shell=True, stdout=PIPE, | ||
| stderr=PIPE) | ||
| stdout, stderr = proc.communicate() | ||
| return_value = proc.returncode | ||
| if return_value != 0: | ||
| raise ValueError( | ||
| "There was an error:\nstdout:\n%s\n\nstderr:\n%s" | ||
| % (stdout, stderr)) | ||
|
|
||
| a_id = a.id | ||
| # Add the new tgz file to the artifact. | ||
| fp_ids = insert_filepaths([(tgz, tgz_id)], a_id, a.artifact_type, | ||
| "filepath", move_files=False) | ||
| sql = """INSERT INTO qiita.artifact_filepath | ||
| (artifact_id, filepath_id) | ||
| VALUES (%s, %s)""" | ||
| sql_args = [[a_id, fp_id] for fp_id in fp_ids] | ||
| TRN.add(sql, sql_args, many=True) | ||
| TRN.execute() | ||
|
|
||
| # | ||
| # Generating compressed files for analysis | ||
| # | ||
| TRN.add("SELECT analysis_id FROM qiita.analysis") | ||
| for result in TRN.execute_fetchindex(): | ||
| analysis_id = result[0] | ||
| # retrieving all analysis filepaths, we could have used | ||
| # Analysis.all_associated_filepath_ids but we could run into the | ||
| # analysis not belonging to the current portal, thus using SQL | ||
|
|
||
| sql = """SELECT filepath, data_directory_id | ||
| FROM qiita.filepath | ||
| JOIN qiita.analysis_filepath USING (filepath_id) | ||
| WHERE analysis_id = %s""" | ||
| TRN.add(sql, [analysis_id]) | ||
| fps = set([tuple(r) for r in TRN.execute_fetchindex()]) | ||
| sql = """SELECT filepath, data_directory_id | ||
| FROM qiita.analysis_job | ||
| JOIN qiita.job USING (job_id) | ||
| JOIN qiita.job_results_filepath USING (job_id) | ||
| JOIN qiita.filepath USING (filepath_id) | ||
| WHERE analysis_id = %s""" | ||
| TRN.add(sql, [analysis_id]) | ||
| fps = fps.union([tuple(r) for r in TRN.execute_fetchindex()]) | ||
|
|
||
| # no filepaths in the analysis | ||
| if not fps: | ||
| continue | ||
|
|
||
| tgz = join(analysis_mp, '%d_files.tgz' % analysis_id) | ||
| if not exists(tgz): | ||
| full_fps = [join(get_mountpoint_path_by_id(mid), f) | ||
| for f, mid in fps] | ||
| cmd = 'tar zcf %s %s' % (tgz, ' '.join(full_fps)) | ||
| proc = Popen(cmd, universal_newlines=True, shell=True, stdout=PIPE, | ||
| stderr=PIPE) | ||
| stdout, stderr = proc.communicate() | ||
| return_value = proc.returncode | ||
| if return_value != 0: | ||
| raise ValueError( | ||
| "There was an error:\nstdout:\n%s\n\nstderr:\n%s" | ||
| % (stdout, stderr)) | ||
|
|
||
| # Add the new tgz file to the analysis. | ||
| fp_ids = insert_filepaths([(tgz, tgz_id)], analysis_id, 'analysis', | ||
| "filepath", move_files=False) | ||
| sql = """INSERT INTO qiita.analysis_filepath | ||
| (analysis_id, filepath_id) | ||
| VALUES (%s, %s)""" | ||
| sql_args = [[analysis_id, fp_id] for fp_id in fp_ids] | ||
| TRN.add(sql, sql_args, many=True) | ||
| TRN.execute() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,26 @@ def generate_pick_closed_reference_otus_cmd(filepaths, out_dir, parameters, | |
| return cmd, output_dir | ||
|
|
||
|
|
||
| def generate_sortmerna_tgz(out_dir): | ||
| """Generates the sortmerna failures tgz command | ||
|
|
||
| Parameters | ||
| ---------- | ||
| out_dir : str | ||
| The job output directory | ||
|
|
||
| Returns | ||
| ------- | ||
| str | ||
| The sortmerna failures tgz command | ||
| """ | ||
| to_tgz = join(out_dir, 'sortmerna_picked_otus') | ||
| tgz = to_tgz + '.tgz' | ||
| cmd = 'tar zcf %s %s' % (tgz, to_tgz) | ||
|
|
||
| return cmd | ||
|
|
||
|
|
||
| def generate_artifact_info(pick_out): | ||
| """Creates the artifact information to attach to the payload | ||
|
|
||
|
|
@@ -104,6 +124,7 @@ def generate_artifact_info(pick_out): | |
| path_builder = partial(join, pick_out) | ||
| filepaths = [(path_builder('otu_table.biom'), 'biom'), | ||
| (path_builder('sortmerna_picked_otus'), 'directory'), | ||
| (path_builder('sortmerna_picked_otus.tgz'), 'tgz'), | ||
| (glob(path_builder('log_*.txt'))[0], 'log')] | ||
| return [['OTU table', 'BIOM', filepaths]] | ||
|
|
||
|
|
@@ -132,7 +153,7 @@ def pick_closed_reference_otus(qclient, job_id, parameters, out_dir): | |
| ValueError | ||
| If there is any error gathering the information from the server | ||
| """ | ||
| qclient.update_job_step(job_id, "Step 1 of 3: Collecting information") | ||
| qclient.update_job_step(job_id, "Step 1 of 4: Collecting information") | ||
| artifact_id = parameters['input_data'] | ||
| fps_info = qclient.get("/qiita_db/artifacts/%s/filepaths/" % artifact_id) | ||
| if not fps_info or not fps_info['success']: | ||
|
|
@@ -155,17 +176,25 @@ def pick_closed_reference_otus(qclient, job_id, parameters, out_dir): | |
| raise ValueError(error_msg) | ||
| reference_fps = ref_info['filepaths'] | ||
|
|
||
| qclient.update_job_step(job_id, "Step 2 of 3: Generating command") | ||
| qclient.update_job_step(job_id, "Step 2 of 4: Generating command") | ||
| command, pick_out = generate_pick_closed_reference_otus_cmd( | ||
| fps, out_dir, parameters, reference_fps) | ||
|
|
||
| qclient.update_job_step(job_id, "Step 3 of 3: Executing OTU picking") | ||
| qclient.update_job_step(job_id, "Step 3 of 4: Executing OTU picking") | ||
| std_out, std_err, return_value = system_call(command) | ||
| if return_value != 0: | ||
| error_msg = ("Error running OTU picking:\nStd out: %s\nStd err: %s" | ||
| % (std_out, std_err)) | ||
| return format_payload(False, error_msg=error_msg) | ||
|
|
||
| qclient.update_job_step(job_id, "Step 4 of 4: tgz sortmerna folder") | ||
|
||
| command = generate_sortmerna_tgz(pick_out) | ||
| std_out, std_err, return_value = system_call(command) | ||
| if return_value != 0: | ||
| error_msg = ("Error while tgz failures:\nStd out: %s\nStd err: %s" | ||
| % (std_out, std_err)) | ||
| return format_payload(False, error_msg=error_msg) | ||
|
|
||
| artifacts_info = generate_artifact_info(pick_out) | ||
|
|
||
| return format_payload(True, artifacts_info=artifacts_info) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,8 @@ | |
| from tempfile import mkstemp, mkdtemp | ||
|
|
||
| from tgp.pick_otus import (write_parameters_file, generate_artifact_info, | ||
| generate_pick_closed_reference_otus_cmd) | ||
| generate_pick_closed_reference_otus_cmd, | ||
| generate_sortmerna_tgz) | ||
|
|
||
|
|
||
| class PickOTUsTests(TestCase): | ||
|
|
@@ -66,6 +67,14 @@ def test_generate_pick_closed_reference_otus_cmd(self): | |
| self.assertEqual(obs, exp) | ||
| self.assertEqual(obs_dir, join(output_dir, 'cr_otus')) | ||
|
|
||
| def test_generate_sortmerna_tgz(self): | ||
| outdir = mkdtemp() | ||
| self._clean_up_files.append(outdir) | ||
| obs = generate_sortmerna_tgz(outdir) | ||
| exp = ("tar zcf %s/sortmerna_picked_otus.tgz " | ||
|
||
| "%s/sortmerna_picked_otus" % (outdir, outdir)) | ||
| self.assertEqual(obs, exp) | ||
|
|
||
| def test_generate_pick_closed_reference_otus_cmd_valueerror(self): | ||
| filepaths = [('/directory/seqs.log', 'log'), | ||
| ('/directory/seqs.demux', 'preprocessed_demux')] | ||
|
|
@@ -93,6 +102,7 @@ def test_generate_artifact_info(self): | |
| obs = generate_artifact_info(outdir) | ||
| fps = [(join(outdir, "otu_table.biom"), "biom"), | ||
| (join(outdir, "sortmerna_picked_otus"), "directory"), | ||
| (join(outdir, "sortmerna_picked_otus.tgz"), "tgz"), | ||
| (log_fp, "log")] | ||
| exp = [['OTU table', 'BIOM', fps]] | ||
| self.assertEqual(obs, exp) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use the function
retrieve_filepathsfrom here ? This way it will look up the mountpoint correctly. This will fail, for example, in the test environment.