Skip to content

Commit edd3eed

Browse files
authored
Improve complete job (qiita-spots#3443)
* Update CHANGELOG.md * improve complete_job * CREATE pg_trgm * EXTENSION IF NOT EXISTS * fix tests * SELECT change * = -> ILIKE
1 parent 35a051e commit edd3eed

File tree

4 files changed

+100
-39
lines changed

4 files changed

+100
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Deployed on October 14th, 2024
1616
* `Woltka v0.1.7, paired-end` superseded `Woltka v0.1.6` in `qp-woltka`; [more information](https://qiita.ucsd.edu/static/doc/html/processingdata/woltka_pairedend.html). Thank you to @qiyunzhu for the benchmarks!
1717
* Other general fixes, like [#3424](https://github.com/qiita-spots/qiita/pull/3424), [#3425](https://github.com/qiita-spots/qiita/pull/3425), [#3439](https://github.com/qiita-spots/qiita/pull/3439), [#3440](https://github.com/qiita-spots/qiita/pull/3440).
1818
* General SPP improvements, like: [NuQC modified to preserve metadata in fastq files](https://github.com/biocore/mg-scripts/pull/155), [use squeue instead of sacct](https://github.com/biocore/mg-scripts/pull/152), , [job aborts if Qiita study contains sample metadata columns reserved for prep-infos](https://github.com/biocore/mg-scripts/pull/151), [metapool generates OverrideCycles value](https://github.com/biocore/metagenomics_pooling_notebook/pull/225).
19+
* We updated the available parameters for `Filter features against reference [filter_features]`, `Non V4 16S sequence assessment [non_v4_16s]` and all the phylogenetic analytical commands so they can use `Greengenes2 2024.09`.
1920

2021

2122

qiita_db/handlers/processing_job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ def post(self, job_id):
146146
cmd, values_dict={'job_id': job_id,
147147
'payload': self.request.body.decode(
148148
'ascii')})
149-
job = qdb.processing_job.ProcessingJob.create(job.user, params)
149+
# complete_job are unique so it is fine to force them to be created
150+
job = qdb.processing_job.ProcessingJob.create(
151+
job.user, params, force=True)
150152
job.submit()
151153

152154
self.finish()

qiita_db/processing_job.py

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -582,10 +582,10 @@ def create(cls, user, parameters, force=False):
582582
TTRN = qdb.sql_connection.TRN
583583
with TTRN:
584584
command = parameters.command
585-
586-
# check if a job with the same parameters already exists
587-
sql = """SELECT processing_job_id, email, processing_job_status,
588-
COUNT(aopj.artifact_id)
585+
if not force:
586+
# check if a job with the same parameters already exists
587+
sql = """SELECT processing_job_id, email,
588+
processing_job_status, COUNT(aopj.artifact_id)
589589
FROM qiita.processing_job
590590
LEFT JOIN qiita.processing_job_status
591591
USING (processing_job_status_id)
@@ -596,41 +596,42 @@ def create(cls, user, parameters, force=False):
596596
GROUP BY processing_job_id, email,
597597
processing_job_status"""
598598

599-
# we need to use ILIKE because of booleans as they can be
600-
# false or False
601-
params = []
602-
for k, v in parameters.values.items():
603-
# this is necessary in case we have an Iterable as a value
604-
# but that is string
605-
if isinstance(v, Iterable) and not isinstance(v, str):
606-
for vv in v:
607-
params.extend([k, str(vv)])
599+
# we need to use ILIKE because of booleans as they can be
600+
# false or False
601+
params = []
602+
for k, v in parameters.values.items():
603+
# this is necessary in case we have an Iterable as a value
604+
# but that is string
605+
if isinstance(v, Iterable) and not isinstance(v, str):
606+
for vv in v:
607+
params.extend([k, str(vv)])
608+
else:
609+
params.extend([k, str(v)])
610+
611+
if params:
612+
# divided by 2 as we have key-value pairs
613+
len_params = int(len(params)/2)
614+
sql = sql.format(' AND ' + ' AND '.join(
615+
["command_parameters->>%s ILIKE %s"] * len_params))
616+
params = [command.id] + params
617+
TTRN.add(sql, params)
608618
else:
609-
params.extend([k, str(v)])
610-
611-
if params:
612-
# divided by 2 as we have key-value pairs
613-
len_params = int(len(params)/2)
614-
sql = sql.format(' AND ' + ' AND '.join(
615-
["command_parameters->>%s ILIKE %s"] * len_params))
616-
params = [command.id] + params
617-
TTRN.add(sql, params)
618-
else:
619-
# the sql variable expects the list of parameters but if there
620-
# is no param we need to replace the {0} with an empty string
621-
TTRN.add(sql.format(""), [command.id])
622-
623-
# checking that if the job status is success, it has children
624-
# [2] status, [3] children count
625-
existing_jobs = [r for r in TTRN.execute_fetchindex()
626-
if r[2] != 'success' or r[3] > 0]
627-
if existing_jobs and not force:
628-
raise ValueError(
629-
'Cannot create job because the parameters are the same as '
630-
'jobs that are queued, running or already have '
631-
'succeeded:\n%s' % '\n'.join(
632-
["%s: %s" % (jid, status)
633-
for jid, _, status, _ in existing_jobs]))
619+
# the sql variable expects the list of parameters but if
620+
# there is no param we need to replace the {0} with an
621+
# empty string
622+
TTRN.add(sql.format(""), [command.id])
623+
624+
# checking that if the job status is success, it has children
625+
# [2] status, [3] children count
626+
existing_jobs = [r for r in TTRN.execute_fetchindex()
627+
if r[2] != 'success' or r[3] > 0]
628+
if existing_jobs:
629+
raise ValueError(
630+
'Cannot create job because the parameters are the '
631+
'same as jobs that are queued, running or already '
632+
'have succeeded:\n%s' % '\n'.join(
633+
["%s: %s" % (jid, status)
634+
for jid, _, status, _ in existing_jobs]))
634635

635636
sql = """INSERT INTO qiita.processing_job
636637
(email, command_id, command_parameters,

qiita_db/support_files/patches/93.sql

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
-- Oct 18, 2024
2+
-- ProcessingJob.create can take up to 52 seconds if creating a complete_job; mainly
3+
-- due to the number of jobs of this command and using json. The solution in the database
4+
-- is to convert to jsonb and index the values of the database
5+
6+
-- ### This are the stats before the change in a single example
7+
-- GroupAggregate (cost=67081.81..67081.83 rows=1 width=77) (actual time=51859.962..51862.637 rows=1 loops=1)
8+
-- Group Key: processing_job.processing_job_id, processing_job_status.processing_job_status
9+
-- -> Sort (cost=67081.81..67081.81 rows=1 width=77) (actual time=51859.952..51862.627 rows=1 loops=1)
10+
-- Sort Key: processing_job.processing_job_id, processing_job_status.processing_job_status
11+
-- Sort Method: quicksort Memory: 25kB
12+
-- -> Nested Loop Left Join (cost=4241.74..67081.80 rows=1 width=77) (actual time=51859.926..51862.604 rows=1 loops=1)
13+
-- -> Nested Loop (cost=4237.30..67069.64 rows=1 width=69) (actual time=51859.889..51862.566 rows=1 loops=1)
14+
-- Join Filter: (processing_job.processing_job_status_id = processing_job_status.processing_job_status_id)
15+
-- Rows Removed by Join Filter: 1
16+
-- -> Gather (cost=4237.30..67068.50 rows=1 width=45) (actual time=51859.846..51862.522 rows=1 loops=1)
17+
-- Workers Planned: 2
18+
-- Workers Launched: 2
19+
-- -> Parallel Bitmap Heap Scan on processing_job (cost=3237.30..66068.40 rows=1 width=45) (actual time=51785.317..51785.446 rows=0 loops=3)
20+
-- Recheck Cond: (command_id = 83)
21+
-- Filter: (((command_parameters ->> 'job_id'::text) ~~* '3432a908-f7b8-4e36-89fc-88f3310b84d5'::text) AND ((command_parameters ->> '
22+
-- payload'::text) ~~* '{"success": true, "error": "", "artifacts": {"alpha_diversity": {"artifact_type": "alpha_vector", "filepaths": [["/qmounts/qiita_test_data/tes
23+
-- tlocal/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity/alpha-diversity.tsv", "plain_text"], ["/qmounts/qiita_test_data/testloca
24+
-- l/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity.qza", "qza"]], "archive": {}}}}'::text))
25+
-- Rows Removed by Filter: 97315
26+
-- Heap Blocks: exact=20133
27+
-- -> Bitmap Index Scan on idx_processing_job_command_id (cost=0.00..3237.30 rows=294517 width=0) (actual time=41.569..41.569 rows=
28+
-- 293054 loops=1)
29+
-- Index Cond: (command_id = 83)
30+
-- -> Seq Scan on processing_job_status (cost=0.00..1.09 rows=4 width=40) (actual time=0.035..0.035 rows=2 loops=1)
31+
-- Filter: ((processing_job_status)::text = ANY ('{success,waiting,running,in_construction}'::text[]))
32+
-- Rows Removed by Filter: 1
33+
-- -> Bitmap Heap Scan on artifact_output_processing_job aopj (cost=4.43..12.14 rows=2 width=24) (actual time=0.031..0.031 rows=0 loops=1)
34+
-- Recheck Cond: (processing_job.processing_job_id = processing_job_id)
35+
-- -> Bitmap Index Scan on idx_artifact_output_processing_job_job (cost=0.00..4.43 rows=2 width=0) (actual time=0.026..0.026 rows=0 loops=1)
36+
-- Index Cond: (processing_job_id = processing_job.processing_job_id)
37+
-- Planning Time: 1.173 ms
38+
-- Execution Time: 51862.756 ms
39+
40+
-- Note: for this to work you need to have created as admin the extension
41+
-- CREATE EXTENSION pg_trgm;
42+
CREATE EXTENSION IF NOT EXISTS "pg_trgm" WITH SCHEMA public;
43+
44+
-- This alter table will take close to 11 min
45+
ALTER TABLE qiita.processing_job
46+
ALTER COLUMN command_parameters TYPE JSONB USING command_parameters::jsonb;
47+
48+
-- This indexing will take like 5 min
49+
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_job_id ON qiita.processing_job
50+
USING GIN((command_parameters->>'job_id') gin_trgm_ops);
51+
52+
-- This indexing will take like an hour
53+
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_payload ON qiita.processing_job
54+
USING GIN((command_parameters->>'payload') gin_trgm_ops);
55+
56+
-- After the changes
57+
-- 18710.404 ms

0 commit comments

Comments
 (0)