Skip to content

maxForks 1 + fair true do not guarantee FIFO task submission order when channel emits all items simultaneously #7117

@thealanjason

Description

@thealanjason

Bug report

When a channel emits all items simultaneously (via toSortedList + flatMap), processes with maxForks 1 and fair true do not submit tasks in channel order. Instead, tasks appear to be submitted in an arbitrary order determined by thread scheduling at task-creation time.

This causes a critical correctness problem when two separate processes (with separate environments) need to coordinate via IPC on the same experiment key simultaneously. If they pick different keys, they deadlock waiting on each other's IPC signals.

Expected behavior and actual behavior

When two processes (SERVER and CLIENT) each have maxForks 1 and fair true, and each receives a sorted channel of N keys, both processes should submit their tasks in channel order — so that SERVER(exp_01) and CLIENT(exp_01) run concurrently.

Instead, the first submitted tasks are chosen arbitrarily from the middle of the sorted list (e.g. SERVER starts on exp_05, CLIENT on exp_04). The two processes deadlock immediately: SERVER is waiting for exp_05/DONE while CLIENT is waiting for exp_04/READY.

Steps to reproduce the problem

Save the following as main.nf and run nextflow run main.nf:

#!/usr/bin/env nextflow
                                                                                                                                        
def N_EXPERIMENTS = 6
                                                                                                                                        
workflow {      

    keys = Channel.of( (1..N_EXPERIMENTS).collect { sprintf('exp_%02d', it) } ).flatten()
                                                                                                                                        
    comm_dirs = keys.map { key ->
        def comm = "/tmp/mwe_ipc_${workflow.sessionId}/${key}"                                                                          
        [key, comm]                                                                                                                     
    }                                                                                                                                   
                                                                                                                                        
    comm_dirs.multiMap { key, comm ->                                                                                                   
        server: [key, comm]
        client: [key, comm]                                                                                                             
    }.set { _split }
                                                                                                                                        
    server_inputs = _split.server
        .toSortedList { a, b -> a[0] <=> b[0] }                                                                                         
        .flatMap { it }
                                                                                                                                        
    client_inputs = _split.client
        .toSortedList { a, b -> a[0] <=> b[0] }                                                                                         
        .flatMap { it }
                                                                                                                                        
    SERVER(server_inputs)
    CLIENT(client_inputs)                                                                                                               
}               

process SERVER {
    maxForks 1
    fair true

    input:
    tuple val(key), val(comm_dir)
                                                                                                                                        
    script:
    """                                                                                                                                 
    mkdir -p "${comm_dir}"
    echo "[SERVER] key=${key}  writing READY to ${comm_dir}"
    touch "${comm_dir}/READY"                                                                                                           
    echo "[SERVER] key=${key}  waiting for DONE..."
    until [ -e "${comm_dir}/DONE" ]; do sleep 0.2; done                                                                                 
    echo "[SERVER] key=${key}  got DONE, exiting"                                                                                       
    rm -f "${comm_dir}/READY" "${comm_dir}/DONE"
    """                                                                                                                                 
}               
                                                                                                                                        
process CLIENT {
    maxForks 1
    fair true

    input:
    tuple val(key), val(comm_dir)

    output:
    val key

    script:
    """
    echo "[CLIENT] key=${key}  waiting for READY in ${comm_dir}..."                                                                     
    until [ -e "${comm_dir}/READY" ]; do sleep 0.2; done
    echo "[CLIENT] key=${key}  found READY — doing work"                                                                                
    sleep 1                                                                                                                             
    echo "[CLIENT] key=${key}  done, writing DONE"
    touch "${comm_dir}/DONE"                                                                                                            
    """         
}

The workflow hangs immediately. Checking the work directories for the two submitted tasks confirms SERVER and CLIENT are processing different keys.

Program output

executor >  local (2)
[51/0c3aab] SERVER (5) [  0%] 0 of 6
[ef/e721c1] CLIENT (4) [  0%] 0 of 6                                                                                                    

Checking the work directories: SERVER (5) ran exp_05, CLIENT (4) ran exp_04 — a mismatch causing a deadlock.

The .nextflow.log 5-minute task dump shows all 12 tasks were pre-created simultaneously in scrambled order, with exp_04 and exp_05 tasks (ids 7 and 9) submitted before exp_01 tasks (ids 1 and 2):

[Task submitter] Submitted process > CLIENT (4) ← exp_04, task id=7
[Task submitter] Submitted process > SERVER (5) ← exp_05, task id=9

%% executor local > tasks in the submission queue:
~> id: 5; CLIENT (3)
~> id: 4; CLIENT (2)
~> id: 3; SERVER (2)
~> id: 6; SERVER (3)
~> id: 1; SERVER (1) ← exp_01, never reached
~> id: 10; CLIENT (5)
~> id: 12; CLIENT (6)
~> id: 8; SERVER (4)
~> id: 2; CLIENT (1) ← exp_01, never reached
~> id: 11; SERVER (6)

.nextflow.log attached.

Environment

  • Nextflow version: 25.04.6 (build 5954, 01-07-2025)
  • Java version: OpenJDK 64-Bit Server VM 17.0.7+4-jvmci-23.0-b10
  • Operating system: Linux 6.12.85+deb13-amd64
  • Bash version: GNU bash 5.2.37(1)-release (x86_64-pc-linux-gnu)

Additional context

toSortedList + flatMap makes all N items available in the channel simultaneously. Nextflow's task processor then creates all N tasks concurrently via its thread pool, assigning task IDs non-deterministically by thread scheduling — so lower-indexed channel items do not get lower task IDs. fair true does not prevent this because it governs round-robin input selection across multiple input channels, not FIFO submission order when all tasks are created simultaneously in a race.

The use case driving this report is two processes that must coordinate via IPC (one acts as an HTTP model server, the other as a client/sampler) and therefore must run concurrently on the same key. They cannot be merged into a single process because they require different execution environments (different conda/container environments). Is there a supported Nextflow idiom to guarantee that two processes consuming paired channels always submit their Nth tasks concurrently?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions