Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions qiita_db/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,39 @@ def study(self):
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.study.Study(qdb.sql_connection.TRN.execute_fetchlast())

def jobs(self, cmd=None, status=None):
"""Jobs that used this artifact as input

Parameters
----------
cmd : qiita_db.software.Command, optional
If provided, only jobs that executed this command will be returned
status : str, optional
If provided, only jobs in this status will be returned

Returns
-------
list of qiita_db.processing_job.ProcessingJob
The list of jobs that used this artifact as input
"""
with qdb.sql_connection.TRN:
sql = """SELECT processing_job_id
FROM qiita.artifact_processing_job
JOIN qiita.processing_job USING (processing_job_id)
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE artifact_id = %s"""
sql_args = [self.id]

if cmd:
sql = "{} AND command_id = %s".format(sql)
sql_args.append(cmd.id)

if status:
sql = "{} AND processing_job_status = %s".format(sql)
sql_args.append(status)

qdb.sql_connection.TRN.add(sql, sql_args)
return [qdb.processing_job.ProcessingJob(jid)
for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
29 changes: 19 additions & 10 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from uuid import uuid4, UUID
from uuid import UUID

import qiita_db as qdb

Expand Down Expand Up @@ -74,19 +74,28 @@ def create(cls, user, parameters):
The newly created job
"""
with qdb.sql_connection.TRN:
# Generate the job_id
job_id = str(uuid4())
while cls.exists(job_id):
job_id = str(uuid4())

command = parameters.command
sql = """INSERT INTO qiita.processing_job
(processing_job_id, email, command_id,
command_parameters, processing_job_status_id)
VALUES (%s, %s, %s, %s, %s)"""
(email, command_id, command_parameters,
processing_job_status_id)
VALUES (%s, %s, %s, %s)
RETURNING processing_job_id"""
status = qdb.util.convert_to_id("queued", "processing_job_status")
sql_args = [job_id, user.id, parameters.command.id,
sql_args = [user.id, command.id,
parameters.dump(), status]
qdb.sql_connection.TRN.add(sql, sql_args)
job_id = qdb.sql_connection.TRN.execute_fetchlast()

# Link the job with the input artifacts
sql = """INSERT INTO qiita.artifact_processing_job
(artifact_id, processing_job_id)
VALUES (%s, %s)"""
for pname, vals in command.parameters.items():
if vals[0] == 'artifact':
qdb.sql_connection.TRN.add(
sql, [parameters.values[pname], job_id])
qdb.sql_connection.TRN.execute()

return cls(job_id)

@property
Expand Down
119 changes: 78 additions & 41 deletions qiita_db/support_files/patches/33.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
-- Change the database structure to remove the RawData, PreprocessedData and
-- ProcessedData division to unify it into the Artifact object

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Rename the id columns from the parameters tables
ALTER TABLE qiita.processed_params_sortmerna RENAME COLUMN processed_params_id TO parameters_id;
ALTER TABLE qiita.processed_params_uclust RENAME COLUMN processed_params_id TO parameters_id;
Expand Down Expand Up @@ -345,6 +347,55 @@ BEGIN
END LOOP;
END $do$;

-- Create tables to keep track of the processing jobs
CREATE TABLE qiita.processing_job_status (
processing_job_status_id bigserial NOT NULL,
processing_job_status varchar NOT NULL,
processing_job_status_description varchar NOT NULL,
CONSTRAINT pk_processing_job_status PRIMARY KEY ( processing_job_status_id )
) ;

INSERT INTO qiita.processing_job_status
(processing_job_status, processing_job_status_description)
VALUES ('queued', 'The job is waiting to be run'),
('running', 'The job is running'),
('success', 'The job completed successfully'),
('error', 'The job failed');

CREATE TABLE qiita.processing_job (
processing_job_id UUID DEFAULT uuid_generate_v4(),
email varchar NOT NULL,
command_id bigint NOT NULL,
command_parameters json NOT NULL,
processing_job_status_id bigint NOT NULL,
logging_id bigint ,
heartbeat timestamp ,
step varchar ,
CONSTRAINT pk_processing_job PRIMARY KEY ( processing_job_id )
) ;
CREATE INDEX idx_processing_job_email ON qiita.processing_job ( email ) ;
CREATE INDEX idx_processing_job_command_id ON qiita.processing_job ( command_id ) ;
CREATE INDEX idx_processing_job_status_id ON qiita.processing_job ( processing_job_status_id ) ;
CREATE INDEX idx_processing_job_logging ON qiita.processing_job ( logging_id ) ;
COMMENT ON COLUMN qiita.processing_job.email IS 'The user that launched the job';
COMMENT ON COLUMN qiita.processing_job.command_id IS 'The command launched';
COMMENT ON COLUMN qiita.processing_job.command_parameters IS 'The parameters used in the command';
COMMENT ON COLUMN qiita.processing_job.logging_id IS 'In case of failure, point to the log entry that holds more information about the error';
COMMENT ON COLUMN qiita.processing_job.heartbeat IS 'The last heartbeat received by this job';
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_qiita_user FOREIGN KEY ( email ) REFERENCES qiita.qiita_user( email ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job FOREIGN KEY ( command_id ) REFERENCES qiita.software_command( command_id ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_status FOREIGN KEY ( processing_job_status_id ) REFERENCES qiita.processing_job_status( processing_job_status_id ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_logging FOREIGN KEY ( logging_id ) REFERENCES qiita.logging( logging_id ) ;

CREATE TABLE qiita.artifact_processing_job (
artifact_id bigint NOT NULL,
processing_job_id UUID NOT NULL,
CONSTRAINT idx_artifact_processing_job PRIMARY KEY ( artifact_id, processing_job_id )
) ;
CREATE INDEX idx_artifact_processing_job_artifact ON qiita.artifact_processing_job ( artifact_id ) ;
CREATE INDEX idx_artifact_processing_job_job ON qiita.artifact_processing_job ( processing_job_id ) ;
ALTER TABLE qiita.artifact_processing_job ADD CONSTRAINT fk_artifact_processing_job FOREIGN KEY ( artifact_id ) REFERENCES qiita.artifact( artifact_id ) ;
ALTER TABLE qiita.artifact_processing_job ADD CONSTRAINT fk_artifact_processing_job_0 FOREIGN KEY ( processing_job_id ) REFERENCES qiita.processing_job( processing_job_id ) ;

-- Create a function to correctly choose the commnad id for the preprocessed
-- data
Expand Down Expand Up @@ -455,6 +506,7 @@ DECLARE
demux_type_id bigint;
biom_type_id bigint;
ppd_cmd_id bigint;
job_id UUID;
params json;
BEGIN
-- We need a new artifact type for representing demultiplexed data (the
Expand All @@ -474,10 +526,11 @@ BEGIN
-- intentional as the raw data sharing should be done at filepath level rather
-- than at raw data level. See issue #1459.
FOR pt_vals IN
SELECT prep_template_id, raw_data_id, filetype_id, study_id, data_type_id
SELECT prep_template_id, raw_data_id, filetype_id, study_id, data_type_id, email
FROM qiita.prep_template
JOIN qiita.raw_data USING (raw_data_id)
JOIN qiita.study_prep_template USING (prep_template_id)
JOIN qiita.study USING (study_id)
WHERE raw_data_id IS NOT NULL
LOOP
-- Move the raw_data
Expand Down Expand Up @@ -535,6 +588,18 @@ BEGIN
params, TRUE, TRUE)
RETURNING artifact_id INTO ppd_a_id;

-- Insert the job that created this preprocessed data
-- Magic number 3: success status - if we have an artifact
-- is because the job completed successfully
INSERT INTO qiita.processing_job (email, command_id, command_parameters,
processing_job_status_id)
VALUES (pt_vals.email, ppd_cmd_id, params, 3)
RETURNING processing_job_id INTO job_id;

-- Link the parent with the job
INSERT INTO qiita.artifact_processing_job (artifact_id, processing_job_id)
VALUES (rd_a_id, job_id);

-- Relate the artifact with the study
INSERT INTO qiita.study_artifact (study_id, artifact_id)
VALUES (pt_vals.study_id, ppd_a_id);
Expand Down Expand Up @@ -589,6 +654,18 @@ BEGIN
biom_type_id, ppd_vals.data_type_id, 3, params)
RETURNING artifact_id into pd_a_id;

-- Insert the job that created this processed data
-- Magic number 3: success status - if we have an artifact
-- is because the job completed successfully
INSERT INTO qiita.processing_job (email, command_id, command_parameters,
processing_job_status_id)
VALUES (pt_vals.email, 3, params, 3)
RETURNING processing_job_id INTO job_id;

-- Link the parent with the job
INSERT INTO qiita.artifact_processing_job (artifact_id, processing_job_id)
VALUES (ppd_a_id, job_id);

-- Relate the artifact with the study
INSERT INTO qiita.study_artifact (study_id, artifact_id)
VALUES (pt_vals.study_id, pd_a_id);
Expand Down Expand Up @@ -707,43 +784,3 @@ BEGIN
END IF;
END
$$ LANGUAGE plpgsql;

-- Create tables to keep track of the processing jobs
CREATE TABLE qiita.processing_job_status (
processing_job_status_id bigserial NOT NULL,
processing_job_status varchar NOT NULL,
processing_job_status_description varchar NOT NULL,
CONSTRAINT pk_processing_job_status PRIMARY KEY ( processing_job_status_id )
) ;

INSERT INTO qiita.processing_job_status
(processing_job_status, processing_job_status_description)
VALUES ('queued', 'The job is waiting to be run'),
('running', 'The job is running'),
('success', 'The job completed successfully'),
('error', 'The job failed');

CREATE TABLE qiita.processing_job (
processing_job_id UUID NOT NULL,
email varchar NOT NULL,
command_id bigint NOT NULL,
command_parameters json NOT NULL,
processing_job_status_id bigint NOT NULL,
logging_id bigint ,
heartbeat timestamp ,
step varchar ,
CONSTRAINT pk_processing_job PRIMARY KEY ( processing_job_id )
) ;
CREATE INDEX idx_processing_job_email ON qiita.processing_job ( email ) ;
CREATE INDEX idx_processing_job_command_id ON qiita.processing_job ( command_id ) ;
CREATE INDEX idx_processing_job_status_id ON qiita.processing_job ( processing_job_status_id ) ;
CREATE INDEX idx_processing_job_logging ON qiita.processing_job ( logging_id ) ;
COMMENT ON COLUMN qiita.processing_job.email IS 'The user that launched the job';
COMMENT ON COLUMN qiita.processing_job.command_id IS 'The command launched';
COMMENT ON COLUMN qiita.processing_job.command_parameters IS 'The parameters used in the command';
COMMENT ON COLUMN qiita.processing_job.logging_id IS 'In case of failure, point to the log entry that holds more information about the error';
COMMENT ON COLUMN qiita.processing_job.heartbeat IS 'The last heartbeat received by this job';
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_qiita_user FOREIGN KEY ( email ) REFERENCES qiita.qiita_user( email ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job FOREIGN KEY ( command_id ) REFERENCES qiita.software_command( command_id ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_status FOREIGN KEY ( processing_job_status_id ) REFERENCES qiita.processing_job_status( processing_job_status_id ) ;
ALTER TABLE qiita.processing_job ADD CONSTRAINT fk_processing_job_logging FOREIGN KEY ( logging_id ) REFERENCES qiita.logging( logging_id ) ;
18 changes: 18 additions & 0 deletions qiita_db/support_files/populate_test_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,18 @@ INSERT INTO qiita.parent_artifact (parent_id, artifact_id)
VALUES (1, 2), (1, 3),
(2, 4);

-- Insert the jobs that processed the previous artifacts
INSERT INTO qiita.processing_job (processing_job_id, email, command_id, command_parameters, processing_job_status_id)
VALUES ('6d368e16-2242-4cf8-87b4-a5dc40bb890b', 'test@foo.bar', 1, '{"max_bad_run_length":3,"min_per_read_length_fraction":0.75,"sequence_max_n":0,"rev_comp_barcode":false,"rev_comp_mapping_barcodes":false,"rev_comp":false,"phred_quality_threshold":3,"barcode_type":"golay_12","max_barcode_errors":1.5,"input_data":1}'::json, 3),
('4c7115e8-4c8e-424c-bf25-96c292ca1931', 'test@foo.bar', 1, '{"max_bad_run_length":3,"min_per_read_length_fraction":0.75,"sequence_max_n":0,"rev_comp_barcode":false,"rev_comp_mapping_barcodes":true,"rev_comp":false,"phred_quality_threshold":3,"barcode_type":"golay_12","max_barcode_errors":1.5,"input_data":1}'::json, 3),
('3c9991ab-6c14-4368-a48c-841e8837a79c', 'test@foo.bar', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json, 3);

-- Relate the above jobs with the artifacts
INSERT INTO qiita.artifact_processing_job (artifact_id, processing_job_id)
VALUES (1, '6d368e16-2242-4cf8-87b4-a5dc40bb890b'),
(1, '4c7115e8-4c8e-424c-bf25-96c292ca1931'),
(2, '3c9991ab-6c14-4368-a48c-841e8837a79c');

-- Insert filepaths for the artifacts and reference
INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algorithm_id, data_directory_id)
VALUES ('1_s_G1_L001_sequences.fastq.gz', 1, '852952723', 1, 5),
Expand Down Expand Up @@ -487,6 +499,12 @@ INSERT INTO qiita.processing_job
('b72369f9-a886-4193-8d3d-f7b504168e75', 'shared@foo.bar', 1, '{"max_bad_run_length":3,"min_per_read_length_fraction":0.75,"sequence_max_n":0,"rev_comp_barcode":false,"rev_comp_mapping_barcodes":true,"rev_comp":false,"phred_quality_threshold":3,"barcode_type":"golay_12","max_barcode_errors":1.5,"input_data":1}'::json, 3, NULL, 'Sun Nov 22 21:15:00 2015', NULL),
('d19f76ee-274e-4c1b-b3a2-a12d73507c55', 'shared@foo.bar', 3, '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1,"input_data":2}'::json, 4, 1, 'Sun Nov 22 21:30:00 2015', 'generating demux file');

INSERT INTO qiita.artifact_processing_job (artifact_id, processing_job_id)
VALUES (1, '063e553b-327c-4818-ab4a-adfe58e49860'),
(1, 'bcc7ebcd-39c1-43e4-af2d-822e3589f14d'),
(1, 'b72369f9-a886-4193-8d3d-f7b504168e75'),
(2, 'd19f76ee-274e-4c1b-b3a2-a12d73507c55');

-- Add a default parameter set for sortemrna
INSERT INTO qiita.default_parameter_set (command_id, parameter_set_name, parameter_set)
VALUES (3, 'Defaults', '{"reference":1,"sortmerna_e_value":1,"sortmerna_max_pos":10000,"similarity":0.97,"sortmerna_coverage":0.97,"threads":1}'::json);
Loading