Skip to content

FluxBufferTimeout with fair back-pressure may hang due to race between flush and upstream emission #4255

@Sage-Pierce

Description

@Sage-Pierce

bufferTimeout(maxSize, timespan, fairBackpressure=true) can deadlock: residual items remain in the buffer with REQUESTED > 0 but no scheduled timer

Expected Behavior

When bufferTimeout is used with fairBackpressure=true, the operator must emit any buffered items within timespan regardless of how they got into the buffer. As long as downstream demand exists, items in the buffer should always be reachable — either by reaching maxSize (size-triggered flush), by the timer firing (time-triggered flush), or by a new downstream request (resumeDemand flush).

Actual Behavior

After a size-triggered flush, the inner subscriber can be left in a state where:

  • The internal queue has N items (0 < N < maxSize),
  • state.index = N,
  • requested > 0 (downstream is waiting on a buffer),
  • TIMEOUT_FLAG is clear, and
  • no Disposable is scheduled on currentTimeoutTask.

In this state nothing can advance the operator:

  • the timer was cancelled at the start of the size-triggered flush and is never rescheduled,
  • new onNext calls only schedule a timer when the visible index was 0, so any further upstream items push the index up but do not arm a new timer, and
  • no new request(n) arrives from downstream because requested is already > 0.

The buffered items sit in bufferTimeout indefinitely, and any post-processing that depends on them being processed downstream are stalled. Anecdotally, we hit this in production with a Kafka stream using Atleon, where the heap dump showed exactly this configuration: batchSize=100, INDEX=99, REQUESTED=1, no TIMEOUT_FLAG, queue.size=99.

The race window is the gap between the inner flush() loop exiting in drain() and the subsequent forceUpdate(...) that decrements state.index by consumed. If new items arrive in that gap (or, in single-threaded land, via the requestMore -> subscription.request(...) call that sits between those two points), they:

  1. enter the queue via queue.offer(t) and increment state.index via forceAddWork, but
  2. do not schedule a timer in onNext because the visible previousState.index is still non-zero (the in-flight drain has not yet decremented it).

When the drain then runs forceUpdate to apply -consumed, the resulting state.index is positive (the newly-arrived count) but the timer that was cancelled at the start of the flush is never re-armed.

Steps to Reproduce

The race is small but can be reproduced deterministically on a single thread by exploiting the requestMore(...) -> subscription.request(...) call that drain() makes between the inner flush loop and forceUpdate. The custom subscription below pushes items inside that call so they land in the queue in the same window the production race exploited.

@Test
void residualItemsAfterPartialFlushAreEmittedByRescheduledTimer() {
    VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();
    List<List<Integer>> emittedBuffers = new ArrayList<>();

    FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Integer, List<Integer>> test =
        new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Integer, List<Integer>>(
            actual, 10, 100, TimeUnit.MILLISECONDS,
            scheduler.createWorker(), ArrayList::new, null);

    Subscription subscription = new Subscription() {
        int callCount = 0;
        @Override
        public void request(long n) {
            callCount++;
            if (callCount == 2) {
                // Second subscription.request comes from requestMore inside the
                // third flushing drain iteration: after the inner flush loop
                // exits but before forceUpdate runs. Items pushed here become
                // residuals with REQUESTED still > 0.
                for (int i = 100; i < 105; i++) {
                    test.onNext(i);
                }
            }
        }
        @Override
        public void cancel() {}
    };

    test.onSubscribe(subscription);

    // Three batches of 10 trigger three size-based flushes. After the third
    // flush, outstanding upstream demand drops below the replenish mark, so
    // drain calls requestMore, the custom subscription pushes 5 items, and
    // those items become residuals.
    for (int i = 0; i < 30; i++) {
        test.onNext(i);
    }

    assertThat(emittedBuffers).hasSize(3);
    assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(5);
    assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(1L);

    // Without the fix the timer was cancelled at the start of the third flush
    // and never re-armed, so this advance does nothing and the residuals are
    // stuck. With a reschedule, the timer fires and emits the residual buffer.
    scheduler.advanceTimeBy(Duration.ofMillis(100));

    assertThat(emittedBuffers).hasSize(4);
    assertThat(emittedBuffers.get(3)).containsExactly(100, 101, 102, 103, 104);
}

Against current main (3.8.6-SNAPSHOT) this test fails with Expected size: 4 but was: 3.

The same shape is reachable from real upstreams too: any time onNext from upstream is concurrent with the drain thread (e.g. an onNext lands between the inner flush loop's last queue.poll() returning null and the drain's forceUpdate(...) CAS), the queue and the state index can end up positive with no scheduled timer.

Possible Solution

In BufferTimeoutWithBackpressureSubscriber.drain(), after the forceUpdate(...) that applies -consumed to the state index, if the resulting state.index > 0 and the subscriber is neither cancelled nor terminated, schedule a fresh timer:

if (consumed > 0) {
    int toDecrement = -consumed;
    currentState = forceUpdate(this, state -> resetTimeout(incrementIndex(state, toDecrement)));
    previousState = resetTimeout(incrementIndex(previousState, toDecrement));

    if (getIndex(currentState) > 0 && !isCancelled(currentState) && !isTerminated(currentState)) {
        try {
            Disposable disposable = timer.schedule(this::bufferTimedOut, timeSpan, unit);
            currentTimeoutTask.update(disposable);
        } catch (RejectedExecutionException e) {
            this.error = Operators.onRejectedExecution(e, subscription, null, null, actual.currentContext());
            previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
        }
    }
}

This mirrors the onNext schedule path (same try/catch shape) and runs only when the drain actually consumed something, so it does not add any scheduling for the common case where the partial-flush race did not fire. currentTimeoutTask.update(...) disposes any previously-stored timer, so subsequent drain iterations that take the shouldFlush branch still cancel cleanly via currentTimeoutTask.update(null).

Semantically it preserves the operator's contract: items in the buffer are emitted within timespan. The residuals effectively get a fresh timespan window from the moment the partial flush completed.

All 246 FluxBuffer* tests pass with the fix in place, including the existing race-style tests (bufferTimeoutShouldNotRaceWithNext, processesLargeDataset, etc.).

Your Environment

  • Reactor version(s) used: Observed in production at 3.6.11; Reproduced on main at 3.8.6-SNAPSHOT
  • Other relevant libraries versions (eg. netty, ...): Atleon (Kafka receiver) sits upstream of bufferTimeout, with STRICT-mode AcknowledgementQueue and maxActiveInFlight=4096. Not strictly required to hit the bug — any upstream that can emit concurrently with the drain thread is enough — but it is what amplified the stall into a hard deadlock.
  • JVM version (java -version): reproduced on openjdk 17.0.18 (Zulu17.64+17-CA)
  • OS and version (eg uname -a): Darwin 25.4.0 arm64. Production runs on Linux.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions