Skip to content

Guided Example ‐ Nf‐core style workflow

Mahesh Binzer-Panchal edited this page Jan 14, 2025 · 4 revisions

A Guided Example

Please read first, and then try to apply the principles with nf-core/fetchngs and nf-core/rnaseq.

Copy the module

Start out by creating a module file with the contents of the NEXTFLOW_RUN process

modules/local/nextflow/run/main.nf:

process NEXTFLOW_RUN {
    tag "$pipeline_name"

    input:
    val pipeline_name     // String
    val nextflow_opts     // String
    val params_file       // pipeline params-file
    val samplesheet       // pipeline samplesheet
    val additional_config // custom configs

    when:
    task.ext.when == null || task.ext.when

    exec:
    // def args = task.ext.args ?: ''
    def cache_dir = java.nio.file.Paths.get(workflow.workDir.resolve(pipeline_name).toUri())
    java.nio.file.Files.createDirectories(cache_dir)
    // construct nextflow command
    def nxf_cmd = [
        'nextflow run',
            pipeline_name,
            nextflow_opts,
            params_file ? "-params-file $params_file" : '',
            additional_config ? "-c $additional_config" : '',
            samplesheet ? "--input $samplesheet" : '',
            "--outdir $task.workDir/results",
    ]
    // Copy command to shell script in work dir for reference/debugging.
    file("$task.workDir/nf-cmd.sh").text = nxf_cmd.join(" ")
    // Run nextflow command locally
    def builder = new ProcessBuilder(nxf_cmd.join(" ").tokenize(" "))
    builder.directory(cache_dir.toFile())
    process = builder.start()
    assert process.waitFor() == 0: process.text
    // Copy nextflow log to work directory
    file("${cache_dir.toString()}/.nextflow.log").copyTo("$task.workDir/.nextflow.log")

    output:
    path "results"  , emit: output
    val process.text, emit: log
}

This module builds the command line instruction (for nf-core style workflows):

nextflow run $pipeline_name $nextflow_opts [-params-file $params_file] [-c $additional_config] [--input $samplesheet] --outdir $task.workDir/results

and then runs it. All the workflow outputs are put in the folder results in the Nextflow working directory.

Include the module in your workflow

main.nf:

include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"

workflow {
    NFCORE_DEMO (
        'nf-core/demo',            // Select nf-core pipeline
        params.nfcore_demo_opts,   // workflow opts supplied as params for flexibility
        params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
    )
}

Here we've selected the nf-core workflow nf-core/demo to include.

  1. Include the NEXTFLOW_RUN module using the include keyword, and say where it's located using from.
  2. The module is aliased ( given another name ) using as to allow for readability, and extensibility ( inclusion of other workflows later on).
  3. The module is then added to the workflow, using the name NFCORE_DEMO.
  4. The first channel input is nf-core/demo, the name of the pipeline we would like to run. This is implicitly converted to Channel.value('nf-core/demo').
  5. The second channel input is a string supplied by params.nf-core_demo_opts which supplies extra workflow options such as -resume, -ansi-log false, -profile docker,test, etc. You can include multiple nextflow options here, e.g. "-resume -profile docker,test".
  6. The third channel input uses a ternary operator (<condition> ? <if true> : <if false >) to return a channel (either a parameter file or empty list). A params file is something that can be generated with nf-core launch or written by one self as a YAML file. It supplies the pipeline with the pipeline parameters i.e. the ones that start with a -- on the command-line.
  7. The fourth channel input performs a similar function to supply the samplesheet (--input). Since this the first process in the chain the <if false> part returns an empty list. For subsequent pipelines in the chain, this would take a samplesheet generated as output from a previous pipeline filtered out from the files in results/.
  8. The last channel is for additional configuration for example if you would like to refine the resources a process in the workflow uses.

Lastly, create a nextflow.config and add process.errorStrategy = 'finish'. Without this, if a pipeline errors, any concurrently running workflows will be killed immediately leaving a nextflow lock file in place preventing the workflow from resuming.

Running the first module

You can test the first module by doing:

nextflow run main.nf -params-file params.yml

where params.yml looks like:

nfcore_demo_opts: '-resume -profile docker'
nfcore_demo_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`
nfcore_demo_samplesheet: '/path/to/samplesheet/input.csv' # The samplesheet
nfcore_demo_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/demo

Include the next module

The outputs are available to use as needed, or include other custom processes that might supply input.

To add another pipeline in the chain, follow the same method to include the NEXTFLOW_RUN module, aliasing it to an appropriate name.

main.nf:

include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"
include { NEXTFLOW_RUN as NFCORE_NEXT } from "./modules/local/nextflow/run/main"

workflow {
    NFCORE_DEMO (
        'nf-core/demo',            // Select nf-core pipeline
        params.nfcore_demo_opts,   // workflow opts supplied as params for flexibility
        params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
    )
    NFCORE_NEXT ( // Copy-pasted with appropriate amendments
        'nf-core/next',            // Select nf-core pipeline
        params.nfcore_next_opts,   // workflow opts supplied as params for flexibility
        params.nfcore_next_params_file ? Channel.fromPath(params.nfcore_next_params_file, checkIfExists: true) : Channel.value([]),
        params.nfcore_next_samplesheet ? Channel.fromPath(params.nfcore_next_samplesheet, checkIfExists: true) : Channel.value([]),
        params.nfcore_next_add_config ? Channel.fromPath(params.nfcore_next_add_config, checkIfExists: true) : Channel.value([]),
    )
}

This next part is the potentially tricky part.

Previous pipeline writes a samplesheet

If the previous pipeline writes a samplesheet as output, then this needs to be extracted from the results folder. This is done using the .resolve function inside a .map channel operation. The file function returns a Path object so Nextflow can find the file when it gets passed to the next module.

NFCORE_DEMO.out.output // The results folder
    .map { dir -> file( dir.resolve('path/to/samplesheet'), checkIfExists: true ) } // The relative path to the sample sheet from `results/`
    .set { nfcore_next_samplesheet } // Name the channel

The supply that channel to the <if_false> part of the next pipeline. Your workflow should look something like:

main.nf:

include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"
include { NEXTFLOW_RUN as NFCORE_NEXT } from "./modules/local/nextflow/run/main"

workflow {
    NFCORE_DEMO (
        'nf-core/demo',            // Select nf-core pipeline
        params.nfcore_demo_opts,   // workflow opts supplied as params for flexibility
        params.nfcore_demo_params_file ? Channel.fromPath(params.nfcore_demo_params_file, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_samplesheet ? Channel.fromPath(params.nfcore_demo_samplesheet, checkIfExists: true) : Channel.value([]),
        params.nfcore_demo_add_config ? Channel.fromPath(params.nfcore_demo_add_config, checkIfExists: true) : Channel.value([]),
    )
    NFCORE_DEMO.out.output // The results folder
        .map { dir -> file( dir.resolve('path/to/samplesheet'), checkIfExists: true ) } // The relative path to the sample sheet from `results/`
        .set { nfcore_next_samplesheet } // Name the channel
    NFCORE_NEXT ( // Copy-pasted with appropriate amendments
        'nf-core/next',            // Select nf-core pipeline
        params.nfcore_next_opts,   // workflow opts supplied as params for flexibility
        params.nfcore_next_params_file ? Channel.fromPath(params.nfcore_next_params_file, checkIfExists: true) : Channel.value([]),
        params.nfcore_next_samplesheet ? Channel.fromPath(params.nfcore_next_samplesheet, checkIfExists: true) : nfcore_next_samplesheet, // ADD SAMPLESHEET CHANNEL HERE!
        params.nfcore_next_add_config ? Channel.fromPath(params.nfcore_next_add_config, checkIfExists: true) : Channel.value([]),
    )
}

Previous pipeline does not write a samplesheet

When the previous pipeline does not write a samplesheet, here's where your Groovy skills come into play. You use the function files which produces a list of Path Objects and then convert that into a sample sheet. A rough example looks like:

NFCORE_DEMO.out.output // The results folder
    .map { 
        dir -> files( dir.resolve('/path/to/files/*.{ext1,ext2}'), checkIfExists: true ) // Extract files from results folder
            .collect { filename -> "${filename.simpleName},${filename}" }  // Make a list of csv lines from each filename
    }
    .flatMap { listOfCsvLines -> [ "sample,filename" ] + listOfCsvLines } // Add a header, and emit each line into the channel
    .collectFile( name: 'next_samplesheet.csv', newLine: true, sort: false ) // Collect the lines into a file, without sorting and adding a new line
    .set { nfcore_next_samplesheet } // Name the channel

This is then added into the workflow like the previous example.

Running the connected workflow

The workflow can be run as before, but this time some additional parameters are supplied.

nextflow run main.nf -params-file params.yml

where params.yml looks like:

nfcore_demo_opts: '-resume -profile docker'
nfcore_demo_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`
nfcore_demo_samplesheet: '/path/to/samplesheet/input.csv' # The samplesheet
nfcore_demo_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/demo
nfcore_next_opts: '-resume -profile docker'
nfcore_next_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`, leave out `--input`
# nfcore_next_samplesheet: '' # Comes from the previous pipeline 
nfcore_next_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/next