-
Notifications
You must be signed in to change notification settings - Fork 3
Guided Example ‐ Nf‐core style workflow
Please read first, and then try to apply the principles with nf-core/fetchngs
and nf-core/rnaseq
.
Start out by creating a module file with the contents of the NEXTFLOW_RUN
process
modules/local/nextflow/run/main.nf
:
process NEXTFLOW_RUN {
tag "$pipeline_name"
input:
val pipeline_name // String
val nextflow_opts // String
val params_file // pipeline params-file
val samplesheet // pipeline samplesheet
val additional_config // custom configs
when:
task.ext.when == null || task.ext.when
exec:
// def args = task.ext.args ?: ''
def cache_dir = java.nio.file.Paths.get(workflow.workDir.resolve(pipeline_name).toUri())
java.nio.file.Files.createDirectories(cache_dir)
// construct nextflow command
def nxf_cmd = [
'nextflow run',
pipeline_name,
nextflow_opts,
params_file ? "-params-file $params_file" : '',
additional_config ? "-c $additional_config" : '',
samplesheet ? "--input $samplesheet" : '',
"--outdir $task.workDir/results",
]
// Copy command to shell script in work dir for reference/debugging.
file("$task.workDir/nf-cmd.sh").text = nxf_cmd.join(" ")
// Run nextflow command locally
def builder = new ProcessBuilder(nxf_cmd.join(" ").tokenize(" "))
builder.directory(cache_dir.toFile())
process = builder.start()
assert process.waitFor() == 0: process.text
// Copy nextflow log to work directory
file("${cache_dir.toString()}/.nextflow.log").copyTo("$task.workDir/.nextflow.log")
output:
path "results" , emit: output
val process.text, emit: log
}
This module builds the command line instruction (for nf-core style workflows):
nextflow run $pipeline_name $nextflow_opts [-params-file $params_file] [-c $additional_config] [--input $samplesheet] --outdir $task.workDir/results
and then runs it. All the workflow outputs are put in the folder results
in the Nextflow working directory.
main.nf
:
include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"
workflow {
NFCORE_DEMO (
'nf-core/demo', // Select nf-core pipeline
params.nfcore_demo_opts, // workflow opts supplied as params for flexibility
params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
)
}
Here we've selected the nf-core workflow nf-core/demo to include.
- Include the
NEXTFLOW_RUN
module using theinclude
keyword, and say where it's located usingfrom
. - The module is aliased ( given another name ) using
as
to allow for readability, and extensibility ( inclusion of other workflows later on). - The module is then added to the
workflow
, using the nameNFCORE_DEMO
. - The first channel input is
nf-core/demo
, the name of the pipeline we would like to run. This is implicitly converted toChannel.value('nf-core/demo')
. - The second channel input is a string supplied by
params.nf-core_demo_opts
which supplies extra workflow options such as-resume
,-ansi-log false
,-profile docker,test
, etc. You can include multiple nextflow options here, e.g."-resume -profile docker,test"
. - The third channel input uses a ternary operator (
<condition> ? <if true> : <if false >
) to return a channel (either a parameter file or empty list). A params file is something that can be generated withnf-core launch
or written by one self as a YAML file. It supplies the pipeline with the pipeline parameters i.e. the ones that start with a--
on the command-line. - The fourth channel input performs a similar function to supply the samplesheet (
--input
). Since this the first process in the chain the<if false>
part returns an empty list. For subsequent pipelines in the chain, this would take a samplesheet generated as output from a previous pipeline filtered out from the files inresults/
. - The last channel is for additional configuration for example if you would like to refine the resources a process in the workflow uses.
Lastly, create a nextflow.config
and add process.errorStrategy = 'finish'
. Without this, if a pipeline errors, any concurrently running workflows will be killed immediately leaving a nextflow lock file in place preventing the workflow from resuming.
You can test the first module by doing:
nextflow run main.nf -params-file params.yml
where params.yml
looks like:
nfcore_demo_opts: '-resume -profile docker'
nfcore_demo_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`
nfcore_demo_samplesheet: '/path/to/samplesheet/input.csv' # The samplesheet
nfcore_demo_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/demo
The outputs are available to use as needed, or include other custom processes that might supply input.
To add another pipeline in the chain, follow the same method to include the NEXTFLOW_RUN
module, aliasing it to an appropriate name.
main.nf
:
include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"
include { NEXTFLOW_RUN as NFCORE_NEXT } from "./modules/local/nextflow/run/main"
workflow {
NFCORE_DEMO (
'nf-core/demo', // Select nf-core pipeline
params.nfcore_demo_opts, // workflow opts supplied as params for flexibility
params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
)
NFCORE_NEXT ( // Copy-pasted with appropriate amendments
'nf-core/next', // Select nf-core pipeline
params.nfcore_next_opts, // workflow opts supplied as params for flexibility
params.nfcore_next_params_file ? Channel.fromPath(params.nfcore_next_params_file, checkIfExists: true) : Channel.value([]),
params.nfcore_next_samplesheet ? Channel.fromPath(params.nfcore_next_samplesheet, checkIfExists: true) : Channel.value([]),
params.nfcore_next_add_config ? Channel.fromPath(params.nfcore_next_add_config, checkIfExists: true) : Channel.value([]),
)
}
This next part is the potentially tricky part.
If the previous pipeline writes a samplesheet as output, then this needs to be extracted from the results folder.
This is done using the .resolve
function inside a .map
channel operation. The file
function returns a Path
object so Nextflow can find the file when it gets passed to the next module.
NFCORE_DEMO.out.output // The results folder
.map { dir -> file( dir.resolve('path/to/samplesheet'), checkIfExists: true ) } // The relative path to the sample sheet from `results/`
.set { nfcore_next_samplesheet } // Name the channel
The supply that channel to the <if_false>
part of the next pipeline.
Your workflow should look something like:
main.nf
:
include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"
include { NEXTFLOW_RUN as NFCORE_NEXT } from "./modules/local/nextflow/run/main"
workflow {
NFCORE_DEMO (
'nf-core/demo', // Select nf-core pipeline
params.nfcore_demo_opts, // workflow opts supplied as params for flexibility
params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
)
NFCORE_DEMO.out.output // The results folder
.map { dir -> file( dir.resolve('path/to/samplesheet'), checkIfExists: true ) } // The relative path to the sample sheet from `results/`
.set { nfcore_next_samplesheet } // Name the channel
NFCORE_NEXT ( // Copy-pasted with appropriate amendments
'nf-core/next', // Select nf-core pipeline
params.nfcore_next_opts, // workflow opts supplied as params for flexibility
params.nfcore_next_params_file ? Channel.fromPath(params.nfcore_next_params_file, checkIfExists: true) : Channel.value([]),
params.nfcore_next_samplesheet ? Channel.fromPath(params.nfcore_next_samplesheet, checkIfExists: true) : nfcore_next_samplesheet, // ADD SAMPLESHEET CHANNEL HERE!
params.nfcore_next_add_config ? Channel.fromPath(params.nfcore_next_add_config, checkIfExists: true) : Channel.value([]),
)
}
When the previous pipeline does not write a samplesheet, here's where your Groovy skills come into play. You use
the function files
which produces a list of Path
Objects and then convert that into a sample sheet. A rough
example looks like:
NFCORE_DEMO.out.output // The results folder
.map {
dir -> files( dir.resolve('/path/to/files/*.{ext1,ext2}'), checkIfExists: true ) // Extract files from results folder
.collect { filename -> "${filename.simpleName},${filename}" } // Make a list of csv lines from each filename
}
.flatMap { listOfCsvLines -> [ "sample,filename" ] + listOfCsvLines } // Add a header, and emit each line into the channel
.collectFile( name: 'next_samplesheet.csv', newLine: true, sort: false ) // Collect the lines into a file, without sorting and adding a new line
.set { nfcore_next_samplesheet } // Name the channel
This is then added into the workflow like the previous example.
The workflow can be run as before, but this time some additional parameters are supplied.
nextflow run main.nf -params-file params.yml
where params.yml
looks like:
nfcore_demo_opts: '-resume -profile docker'
nfcore_demo_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`
nfcore_demo_samplesheet: '/path/to/samplesheet/input.csv' # The samplesheet
nfcore_demo_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/demo
nfcore_next_opts: '-resume -profile docker'
nfcore_next_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`, leave out `--input`
# nfcore_next_samplesheet: '' # Comes from the previous pipeline
nfcore_next_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/next