Skip to content

Commit

Permalink
Merge pull request #73 from naobservatory/dev
Browse files Browse the repository at this point in the history
v2.4.0
  • Loading branch information
willbradshaw authored Oct 18, 2024
2 parents b7ceacc + 6726bb2 commit 8667e9f
Show file tree
Hide file tree
Showing 27 changed files with 606 additions and 141 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# v2.4.0
- Created a new output directory where we put log files called `logging`.
- Added the trace file from Nextflow to the `logging` directory which can be used for understanding cpu, memory usage, and other infromation like runtime. After running the pipeline, `plot-timeline-script.R` can be used to generate a useful summary plot of the runtime for each process in the pipeline.
- Removed CONCAT_GZIPPED.
- Replaced the sample input format with something more similar to nf-core, called `samplesheet.csv`. This new input file can be generated using the script `generate_samplesheet.sh`.
- Now run deduplication on paired-ends reads using clumpify in the taxonomic workflow.
- Fragment length analysis and deduplication analysis.
- BBtools: Extract the fragment length as well as the number of duplicates from the taxonomic workflow and add them to the `hv_hits_putative_collapsed.tsv.gz`.
- Bowtie2: Conduct a duplication analysis on the aligned reads, then add the number of duplicates and fragment length to the `hv_hits_putative_collapsed.tsv.gz`.

# v2.3.2
- Fixed subsetReads to run on all reads when the number of reads per sample is below the set threshold.

Expand Down
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The pipeline currently consists of two workflows, an **index** workflow and a **

The run workflow then consists of five phases:

1. A **preprocessing phase**, in which input files are concatenated according to sample identity and undergo adapter & quality trimming.
1. A **preprocessing phase**, in which input files undergo adapter & quality trimming.
2. A **viral identification phase**, in which a custom multi-step pipeline based around BBDuk, Bowtie2 and Kraken2 is used to sensitively and specifically identify human-infecting virus (HV) reads in the input data for downstream analysis.
3. A **taxonomic profiling phase**, in which a random subset of reads (default 1M/sample) undergo ribosomal classification with BBDuk, followed by broader taxonomic classification with Kraken2.
4. An optional **BLAST validation phase**, in which putative HV reads from phase 2 are checked against nt to evaluate the sensitivity and specificity of the HV identification process.
Expand Down Expand Up @@ -46,9 +46,7 @@ Run the index workflow by setting `mode = "index"` in the relevant config file.

#### Preprocessing phase

The run workflow begins by concatenating all libraries assigned to the same sample ID together[^concat]. The reads then undergo cleaning by [FASTP](https://github.com/OpenGene/fastp), which both screens for adapters and trims low-quality and low-complexity sequences. The output of FASTP is then passed on to the other parts of the run workflow.

[^concat]: This is controlled by the library file specified in the config file; any libraries with the same entry in the `sample` column are concatenated. This is primarily useful in cases where the same library is sequenced multiple times, e.g. to reach some total target depth.
The run workflow begins by undergoing cleaning by [FASTP](https://github.com/OpenGene/fastp), which both screens for adapters and trims low-quality and low-complexity sequences. The output of FASTP is then passed on to the other parts of the run workflow.

#### Viral identification phase

Expand Down Expand Up @@ -126,10 +124,7 @@ If the pipeline runs to completion, the following output files are expected.
1. `adapters.fasta`: FASTA file of adapter sequences used for adapter screening.
2. `params.json`: JSON file giving all the parameters passed to the pipeline.
3. A CSV file giving sample metadata (filename specified by `params.sample_tab`).
4. `time.txt`: Start time of run workflow.
5. `pipeline-version.txt`: Version of pipeline used for run.
6. `pipeline-version-index.txt`: Version of pipeline used to generate index directory (`params.ref_dir`).
7. `index-params.json`: JSON file giving parameters used to generate index directory (`params.ref_dir`).
4. `index-params.json`: JSON file giving parameters used to generate index directory (`params.ref_dir`).
2. `output/intermediates`: Intermediate files produced by key stages in the run workflow, saved for nonstandard downstream analysis.
1. `reads/cleaned`: Directory containing paired FASTQ files for cleaned reads (i.e. the output of the preprocessing phase described above).
3. `output/results`: Directory containing processed results files for standard downstream analysis.
Expand All @@ -150,6 +145,11 @@ If the pipeline runs to completion, the following output files are expected.
7. `qc/qc_quality_sequence_stats.tsv.gz`: Per-sequence read-quality statistics calculated by FASTQC for each sample and preprocessing stage, given as the number of reads (`n_sequences`) with a given mean Phred score (`mean_phred_score`) for each read in the read pair (`read_pair`).
8. `taxonomy/kraken_reports_merged.tsv.gz`: Kraken output reports in TSV format, labeled by sample and ribosomal status.
9. `taxonomy/bracken_reports_merged.tsv.gz`: Bracken output reports in TSV format, labeled by sample and ribosomal status.
4. `output/logging`: Directory containing the log files generated by the pipeline.
1. `trace-${timestamp}.txt`: Tab delimitied log of all the information for each task run in the pipeline including runtime, memory usage, exit status, etc. Can be used to create an execution timeline using the the script `bin/plot-timeline-script.R` after the pipeline has finished running. More information regarding the trace file format can be found [here](https://www.nextflow.io/docs/latest/reports.html#trace-file).
2. `time.txt`: Start time of run workflow.
3. `pipeline-version.txt`: Version of pipeline used for run.
4. `pipeline-version-index.txt`: Version of pipeline used to generate index directory (`params.ref_dir`).

[^bitscore]: If only one read aligns to the target, these two fields will be identical. If not, they will give the higher and lower of the best bitscores for the two reads in the pair..

Expand Down Expand Up @@ -273,22 +273,22 @@ Finally, you can run the test dataset through the pipeline on AWS Batch. To do t
To run the workflow on another dataset, you need:

1. Accessible raw data files in Gzipped FASTQ format, named appropriately.
2. A library file specifying the relationship between the names of these files (specifically, substrings uniquely identifying each pair of FASTQ files) and the original samples.
3. A sample sheet giving metadata for each sample (can be identical with the library file).
4. A config file in a clean launch directory, pointing to:
2. A sample sheet file specifying the samples, along with paths to the forward and reverse read files for each sample.
3. A config file in a clean launch directory, pointing to:
- The directory containing the raw data (`params.raw_dir`).
- The base directory in which to put the working and output directories (`params.base_dir`).
- The directory containing the outputs of the reference workflow (`params.ref_dir`).
- The library file (`params.library_tab`) and sample sheet (`params.sample_tab`).
- The sample sheet (`params.sample_sheet`).
- Various other parameter values.

> [!NOTE]
> Currently, the pipeline requires the following of raw data files:
> - They must be contained in a single directory
> - Each pair of read files must be uniquely identifiable by a filename substring (specified in the `library` column of `params.library_tab`)
> The samplesheet must have the following format for each row:
> - First column: Sample ID
> - Second column: Path to FASTQ file 1 which should be the forward read for this sample
> - Third column: Path to FASTQ file 2 which should be the reverse read for this sample
>
> The easiest way to get this file is by using the `generate_samplesheet.sh` script. As input, this script takes a path to raw FASTQ files (`dir_path`), and forward (`forward_suffix`) and reverse (`reverse_suffix`) read suffixes, both of which support regex, and an optional output path (`output_path`). Those using data from s3 should make sure to pass the `s3` parameter. As output, the script generates a CSV file named (`samplesheet.csv` by default), which can be used as input for the pipeline.
> [!NOTE]
> The library file should specify the mapping between library read files and sample IDs. It must be a CSV file with `library` and `sample` columns, as well as any other metadata columns you feel is appropriate. See `test/libraries.csv` for a minimal example.

If running on Batch, a good process for starting the pipeline on a new dataset is as follows:

Expand Down
81 changes: 81 additions & 0 deletions bin/plot-timeline-script.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
library(pacman)
p_load(tidyverse, lubridate, plotly, htmlwidgets, rmarkdown, optparse)

convert_date_to_seconds <- function(date_str) {
if (grepl("ms", date_str, ignore.case = TRUE)) {
numeric_value <- as.numeric(gsub("[^0-9.]", "", date_str))
return(numeric_value / 1000)

} else if (grepl("^(\\d+h\\s*)?(\\d+m\\s*)?(\\d+(\\.\\d+)?s)?$", date_str)) {
hours <- suppressWarnings(as.numeric(sub("^(\\d+)h.*$", "\\1", date_str, perl = TRUE)))
minutes <- suppressWarnings(as.numeric(sub("^(?:\\d+h\\s*)?(\\d+)m.*$", "\\1", date_str, perl = TRUE)))
seconds <- suppressWarnings(as.numeric(sub("^(?:\\d+h\\s*)?(?:\\d+m\\s*)?(\\d+(?:\\.\\d+)?)s$", "\\1", date_str, perl = TRUE)))

hours <- ifelse(is.na(hours), 0, hours)
minutes <- ifelse(is.na(minutes), 0, minutes)
seconds <- ifelse(is.na(seconds), 0, seconds)
return(hours * 3600 + minutes * 60 + seconds)
} else {
return(NULL)
}
}

option_list <- list(
make_option(c("-i", "--input"), type = "character", default = NULL,
help = "Input file path to trace file", metavar = "character"),
make_option(c("-o", "--output"), type = "character", default = NULL,
help = "Output file path to html file", metavar="character")
)

opt_parser <- OptionParser(option_list = option_list)
opt <- parse_args(opt_parser)

if (is.null(opt$input) || is.null(opt$output)) {
stop("Input and output file paths must be provided. Use -h for help.")
}

data <- read_tsv(opt$input)

selective_data <- data %>%
select(task_id, name, realtime, submit, start, complete, duration, realtime, rss) %>%
mutate(
ending = as_datetime(start) + seconds(lapply(realtime, convert_date_to_seconds)),
midpoint = start + (ending - start) / 2
)

selective_data <- selective_data %>%
arrange(desc(task_id)) %>%
mutate(name = factor(name, levels = unique(name)))


g <- ggplot(selective_data) +
geom_segment(aes(y = name, yend = name, x = submit, xend = start, color = "Time from submission to task starting"),
size = 8) +
geom_segment(aes(y = name, yend = name, x = start, xend = ending, color = "Time from task starting to task finishing"),
size = 8) +
geom_segment(aes(y = name, yend = name, x = ending, xend = complete, color = "Time spent staging and cleaning/unstaging input and output data"),
size = 8) +
geom_text(aes(x = midpoint, y = name, label = sprintf("%s/%s", duration, rss)), size = 3) +
scale_color_manual(values = c("Time from submission to task starting" = "#fc8d59",
"Time from task starting to task finishing" = "#ffffbf",
"Time spent staging and cleaning/unstaging input and output data" = "#91bfdb")) +
theme_light() +
labs(x = "", y = "", color = "Task Status") +
theme(
axis.text.y = element_text(size = 10, color = "black"),
axis.text.x = element_text(size = 10, color = "black", face = "bold"),
axis.title.x = element_text(size = 10, color = "black", face = "bold"),
axis.title.y = element_text(size = 10, color = "black", face = "bold"),
panel.grid.major.x = element_line(size = 1, color = "black", linetype = "dotted"),
panel.grid.minor.x = element_line(color = "black", linetype = "dotted"),
legend.position = "top",
legend.title = element_text(size = 12, face = "bold"),
legend.text = element_text(size = 10)
)

g

interactive_plot <- ggplotly(g, height = nrow(selective_data) * 25)
saveWidget(interactive_plot, opt$output)

cat("Plot saved to:", opt$output, "\n")
14 changes: 9 additions & 5 deletions bin/process_bowtie2_sam.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ def join_line(fields):
def get_line(query_name, genome_id, taxid, fragment_length, best_alignment_score_fwd, best_alignment_score_ref,
next_alignment_score_fwd, next_alignment_score_rev, edit_distance_fwd, edit_distance_rev,
ref_start_fwd, ref_start_rev, map_qual_fwd, map_qual_rev, cigar_fwd, cigar_rev, query_len_fwd, query_len_rev,
query_seq_fwd, query_seq_rev, pair_status):
query_seq_fwd, query_seq_rev, query_qual_fwd, query_qual_rev, pair_status):
"""Convert a list of arguments into an output line."""
fields = [query_name, genome_id, taxid, fragment_length, best_alignment_score_fwd, best_alignment_score_ref,
next_alignment_score_fwd, next_alignment_score_rev, edit_distance_fwd, edit_distance_rev,
ref_start_fwd, ref_start_rev, map_qual_fwd, map_qual_rev, cigar_fwd, cigar_rev, query_len_fwd, query_len_rev,
query_seq_fwd, query_seq_rev, pair_status]
query_seq_fwd, query_seq_rev, query_qual_fwd, query_qual_rev, pair_status]
fields_joined = join_line(fields)
return(fields_joined)

Expand All @@ -133,6 +133,7 @@ def process_sam_unpaired_pair(read_dict):
cigar_fwd, cigar_rev = read_dict["cigar"], "NA"
query_len_fwd, query_len_rev = read_dict["query_len"], "NA"
query_seq_fwd, query_seq_rev = read_dict["query_seq"], "NA"
query_qual_fwd, query_qual_rev = read_dict["query_qual"], "NA"
else:
best_alignment_score_fwd, best_alignment_score_rev = "NA", read_dict["alignment_score"]
next_alignment_score_fwd, next_alignment_score_rev = "NA", read_dict["next_best_alignment"]
Expand All @@ -142,10 +143,11 @@ def process_sam_unpaired_pair(read_dict):
cigar_fwd, cigar_rev = "NA", read_dict["cigar"]
query_len_fwd, query_len_rev = "NA", read_dict["query_len"]
query_seq_fwd, query_seq_rev = "NA", read_dict["query_seq"]
query_qual_fwd, query_qual_rev = "NA", read_dict["query_qual"]
return get_line(read_dict["query_name"], read_dict["genome_id"], read_dict["taxid"], read_dict["fragment_length"],
best_alignment_score_fwd, best_alignment_score_rev, next_alignment_score_fwd, next_alignment_score_rev,
edit_distance_fwd, edit_distance_rev, ref_start_fwd, ref_start_rev, map_qual_fwd, map_qual_rev,
cigar_fwd, cigar_rev, query_len_fwd, query_len_rev, query_seq_fwd, query_seq_rev, read_dict["pair_status"])
cigar_fwd, cigar_rev, query_len_fwd, query_len_rev, query_seq_fwd, query_seq_rev, query_qual_fwd, query_qual_rev, read_dict["pair_status"])

def line_from_valid_pair(fwd_dict, rev_dict):
"""Generate an output line from a validated (concordant or discordant) pair of SAM alignments."""
Expand All @@ -169,11 +171,13 @@ def line_from_valid_pair(fwd_dict, rev_dict):
query_len_rev = rev_dict["query_len"]
query_seq_fwd = fwd_dict["query_seq"]
query_seq_rev = rev_dict["query_seq"]
query_qual_fwd = fwd_dict["query_qual"]
query_qual_rev = rev_dict["query_qual"]
pair_status = fwd_dict["pair_status"]
return get_line(query_name, genome_id, taxid, fragment_length,
best_alignment_score_fwd, best_alignment_score_rev, next_alignment_score_fwd, next_alignment_score_rev,
edit_distance_fwd, edit_distance_rev, ref_start_fwd, ref_start_rev, map_qual_fwd, map_qual_rev,
cigar_fwd, cigar_rev, query_len_fwd, query_len_rev, query_seq_fwd, query_seq_rev, pair_status)
cigar_fwd, cigar_rev, query_len_fwd, query_len_rev, query_seq_fwd, query_seq_rev, query_qual_fwd, query_qual_rev, pair_status)

def process_sam_concordant_pair(fwd_dict, rev_dict):
"""Process a concordant pair of SAM alignments."""
Expand Down Expand Up @@ -217,7 +221,7 @@ def process_sam_discordant_pair(fwd_dict, rev_dict):
# File-level functions
def write_sam_headers_paired(out_file):
"""Write header line to new TSV."""
headers = ["query_name", "genome_id", "taxid", "fragment_length", "best_alignment_score_fwd", "best_alignment_score_rev", "next_alignment_score_fwd", "next_alignment_score_rev", "edit_distance_fwd", "edit_distance_rev", "ref_start_fwd", "ref_start_rev", "map_qual_fwd", "map_qual_rev", "cigar_fwd", "cigar_rev", "query_len_fwd", "query_len_rev", "query_seq_fwd", "query_seq_rev", "pair_status"]
headers = ["query_name", "genome_id", "taxid", "fragment_length", "best_alignment_score_fwd", "best_alignment_score_rev", "next_alignment_score_fwd", "next_alignment_score_rev", "edit_distance_fwd", "edit_distance_rev", "ref_start_fwd", "ref_start_rev", "map_qual_fwd", "map_qual_rev", "cigar_fwd", "cigar_rev", "query_len_fwd", "query_len_rev", "query_seq_fwd", "query_seq_rev", "query_qual_fwd", "query_qual_rev", "pair_status"]
header_line = join_line(headers)
out_file.write(header_line)
return None
Expand Down
7 changes: 7 additions & 0 deletions configs/logging.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def trace_timestamp = new java.util.Date().format( 'yyyy-MM-dd_HH-mm-ss')
trace {
enabled = true
overwrite = true
file = "${params.base_dir}/output/logging/trace-${trace_timestamp}.txt"
fields = 'task_id, hash, native_id, process, tag, name, status, exit, module, container, cpus, time, disk, memory, attempt, submit, start, complete, duration, realtime, queue, %cpu, %mem, rss, vmem, peak_rss, peak_vmem, rchar, wchar, syscr, syscw, read_bytes, write_bytes, vol_ctxt, inv_ctxt, env, workdir, scratch, error_action'
}
1 change: 1 addition & 0 deletions configs/profiles.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ aws.client.maxConnections = 1000
aws.client.maxErrorRetry = 10
aws.client.connectionTimeout = 0
aws.client.socketTimeout = 0
nextflow.enable.moduleBinaries = true

// Workflow run profiles
profiles {
Expand Down
12 changes: 4 additions & 8 deletions configs/run.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ params {
mode = "run"

// Directories
base_dir = "s3://nao-mgs-wb/test-remote" // Parent for working and output directories (can be S3)
raw_dir = "${launchDir}/raw" // Raw data directory
base_dir = "s3://nao-mgs-wb/test-batch" // Parent for working and output directories (can be S3)
ref_dir = "s3://nao-mgs-wb/index-20240714/output" // Reference/index directory (generated by index workflow)

// Files
library_tab = "${launchDir}/libraries.csv" // Path to library TSV
sample_tab = "${params.library_tab}" // Path to sample TSV (can be same as library tab if 1:1 mapping)
sample_sheet = "${launchDir}/samplesheet.csv" // Path to library TSV
adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming

// Numerical
Expand All @@ -22,12 +20,10 @@ params {
blast_hv_fraction = 0 // Fraction of putative HV reads to BLAST vs nt (0 = don't run BLAST)
kraken_memory = "128 GB" // Memory needed to safely load Kraken DB
quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64)

// Raw read file suffixes
r1_suffix = "_1"
r2_suffix = "_2"
fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2)
}

includeConfig "${projectDir}/configs/logging.config"
includeConfig "${projectDir}/configs/containers.config"
includeConfig "${projectDir}/configs/resources.config"
includeConfig "${projectDir}/configs/profiles.config"
Expand Down
Loading

0 comments on commit 8667e9f

Please sign in to comment.