bufferTimeout(maxSize, timespan, fairBackpressure=true) can deadlock: residual items remain in the buffer with REQUESTED > 0 but no scheduled timer
Expected Behavior
When bufferTimeout is used with fairBackpressure=true, the operator must emit any buffered items within timespan regardless of how they got into the buffer. As long as downstream demand exists, items in the buffer should always be reachable — either by reaching maxSize (size-triggered flush), by the timer firing (time-triggered flush), or by a new downstream request (resumeDemand flush).
Actual Behavior
After a size-triggered flush, the inner subscriber can be left in a state where:
- The internal queue has
N items (0 < N < maxSize),
state.index = N,
requested > 0 (downstream is waiting on a buffer),
TIMEOUT_FLAG is clear, and
- no
Disposable is scheduled on currentTimeoutTask.
In this state nothing can advance the operator:
- the timer was cancelled at the start of the size-triggered flush and is never rescheduled,
- new
onNext calls only schedule a timer when the visible index was 0, so any further upstream items push the index up but do not arm a new timer, and
- no new
request(n) arrives from downstream because requested is already > 0.
The buffered items sit in bufferTimeout indefinitely, and any post-processing that depends on them being processed downstream are stalled. Anecdotally, we hit this in production with a Kafka stream using Atleon, where the heap dump showed exactly this configuration: batchSize=100, INDEX=99, REQUESTED=1, no TIMEOUT_FLAG, queue.size=99.
The race window is the gap between the inner flush() loop exiting in drain() and the subsequent forceUpdate(...) that decrements state.index by consumed. If new items arrive in that gap (or, in single-threaded land, via the requestMore -> subscription.request(...) call that sits between those two points), they:
- enter the queue via
queue.offer(t) and increment state.index via forceAddWork, but
- do not schedule a timer in
onNext because the visible previousState.index is still non-zero (the in-flight drain has not yet decremented it).
When the drain then runs forceUpdate to apply -consumed, the resulting state.index is positive (the newly-arrived count) but the timer that was cancelled at the start of the flush is never re-armed.
Steps to Reproduce
The race is small but can be reproduced deterministically on a single thread by exploiting the requestMore(...) -> subscription.request(...) call that drain() makes between the inner flush loop and forceUpdate. The custom subscription below pushes items inside that call so they land in the queue in the same window the production race exploited.
@Test
void residualItemsAfterPartialFlushAreEmittedByRescheduledTimer() {
VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();
List<List<Integer>> emittedBuffers = new ArrayList<>();
FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Integer, List<Integer>> test =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Integer, List<Integer>>(
actual, 10, 100, TimeUnit.MILLISECONDS,
scheduler.createWorker(), ArrayList::new, null);
Subscription subscription = new Subscription() {
int callCount = 0;
@Override
public void request(long n) {
callCount++;
if (callCount == 2) {
// Second subscription.request comes from requestMore inside the
// third flushing drain iteration: after the inner flush loop
// exits but before forceUpdate runs. Items pushed here become
// residuals with REQUESTED still > 0.
for (int i = 100; i < 105; i++) {
test.onNext(i);
}
}
}
@Override
public void cancel() {}
};
test.onSubscribe(subscription);
// Three batches of 10 trigger three size-based flushes. After the third
// flush, outstanding upstream demand drops below the replenish mark, so
// drain calls requestMore, the custom subscription pushes 5 items, and
// those items become residuals.
for (int i = 0; i < 30; i++) {
test.onNext(i);
}
assertThat(emittedBuffers).hasSize(3);
assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(5);
assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(1L);
// Without the fix the timer was cancelled at the start of the third flush
// and never re-armed, so this advance does nothing and the residuals are
// stuck. With a reschedule, the timer fires and emits the residual buffer.
scheduler.advanceTimeBy(Duration.ofMillis(100));
assertThat(emittedBuffers).hasSize(4);
assertThat(emittedBuffers.get(3)).containsExactly(100, 101, 102, 103, 104);
}
Against current main (3.8.6-SNAPSHOT) this test fails with Expected size: 4 but was: 3.
The same shape is reachable from real upstreams too: any time onNext from upstream is concurrent with the drain thread (e.g. an onNext lands between the inner flush loop's last queue.poll() returning null and the drain's forceUpdate(...) CAS), the queue and the state index can end up positive with no scheduled timer.
Possible Solution
In BufferTimeoutWithBackpressureSubscriber.drain(), after the forceUpdate(...) that applies -consumed to the state index, if the resulting state.index > 0 and the subscriber is neither cancelled nor terminated, schedule a fresh timer:
if (consumed > 0) {
int toDecrement = -consumed;
currentState = forceUpdate(this, state -> resetTimeout(incrementIndex(state, toDecrement)));
previousState = resetTimeout(incrementIndex(previousState, toDecrement));
if (getIndex(currentState) > 0 && !isCancelled(currentState) && !isTerminated(currentState)) {
try {
Disposable disposable = timer.schedule(this::bufferTimedOut, timeSpan, unit);
currentTimeoutTask.update(disposable);
} catch (RejectedExecutionException e) {
this.error = Operators.onRejectedExecution(e, subscription, null, null, actual.currentContext());
previousState = forceAddWork(this, BufferTimeoutWithBackpressureSubscriber::setTerminated);
}
}
}
This mirrors the onNext schedule path (same try/catch shape) and runs only when the drain actually consumed something, so it does not add any scheduling for the common case where the partial-flush race did not fire. currentTimeoutTask.update(...) disposes any previously-stored timer, so subsequent drain iterations that take the shouldFlush branch still cancel cleanly via currentTimeoutTask.update(null).
Semantically it preserves the operator's contract: items in the buffer are emitted within timespan. The residuals effectively get a fresh timespan window from the moment the partial flush completed.
All 246 FluxBuffer* tests pass with the fix in place, including the existing race-style tests (bufferTimeoutShouldNotRaceWithNext, processesLargeDataset, etc.).
Your Environment
- Reactor version(s) used: Observed in production at
3.6.11; Reproduced on main at 3.8.6-SNAPSHOT
- Other relevant libraries versions (eg.
netty, ...): Atleon (Kafka receiver) sits upstream of bufferTimeout, with STRICT-mode AcknowledgementQueue and maxActiveInFlight=4096. Not strictly required to hit the bug — any upstream that can emit concurrently with the drain thread is enough — but it is what amplified the stall into a hard deadlock.
- JVM version (
java -version): reproduced on openjdk 17.0.18 (Zulu17.64+17-CA)
- OS and version (eg
uname -a): Darwin 25.4.0 arm64. Production runs on Linux.
bufferTimeout(maxSize, timespan, fairBackpressure=true)can deadlock: residual items remain in the buffer withREQUESTED > 0but no scheduled timerExpected Behavior
When
bufferTimeoutis used withfairBackpressure=true, the operator must emit any buffered items withintimespanregardless of how they got into the buffer. As long as downstream demand exists, items in the buffer should always be reachable — either by reachingmaxSize(size-triggered flush), by the timer firing (time-triggered flush), or by a new downstream request (resumeDemandflush).Actual Behavior
After a size-triggered flush, the inner subscriber can be left in a state where:
Nitems (0 < N < maxSize),state.index = N,requested > 0(downstream is waiting on a buffer),TIMEOUT_FLAGis clear, andDisposableis scheduled oncurrentTimeoutTask.In this state nothing can advance the operator:
onNextcalls only schedule a timer when the visible index was0, so any further upstream items push the index up but do not arm a new timer, andrequest(n)arrives from downstream becauserequestedis already> 0.The buffered items sit in
bufferTimeoutindefinitely, and any post-processing that depends on them being processed downstream are stalled. Anecdotally, we hit this in production with a Kafka stream using Atleon, where the heap dump showed exactly this configuration:batchSize=100, INDEX=99, REQUESTED=1, no TIMEOUT_FLAG, queue.size=99.The race window is the gap between the inner
flush()loop exiting indrain()and the subsequentforceUpdate(...)that decrementsstate.indexbyconsumed. If new items arrive in that gap (or, in single-threaded land, via therequestMore -> subscription.request(...)call that sits between those two points), they:queue.offer(t)and incrementstate.indexviaforceAddWork, butonNextbecause the visiblepreviousState.indexis still non-zero (the in-flight drain has not yet decremented it).When the drain then runs
forceUpdateto apply-consumed, the resultingstate.indexis positive (the newly-arrived count) but the timer that was cancelled at the start of the flush is never re-armed.Steps to Reproduce
The race is small but can be reproduced deterministically on a single thread by exploiting the
requestMore(...) -> subscription.request(...)call thatdrain()makes between the inner flush loop andforceUpdate. The custom subscription below pushes items inside that call so they land in the queue in the same window the production race exploited.Against current
main(3.8.6-SNAPSHOT) this test fails withExpected size: 4 but was: 3.The same shape is reachable from real upstreams too: any time
onNextfrom upstream is concurrent with the drain thread (e.g. anonNextlands between the inner flush loop's lastqueue.poll()returningnulland the drain'sforceUpdate(...)CAS), the queue and the state index can end up positive with no scheduled timer.Possible Solution
In
BufferTimeoutWithBackpressureSubscriber.drain(), after theforceUpdate(...)that applies-consumedto the state index, if the resultingstate.index > 0and the subscriber is neither cancelled nor terminated, schedule a fresh timer:This mirrors the
onNextschedule path (same try/catch shape) and runs only when the drain actually consumed something, so it does not add any scheduling for the common case where the partial-flush race did not fire.currentTimeoutTask.update(...)disposes any previously-stored timer, so subsequent drain iterations that take theshouldFlushbranch still cancel cleanly viacurrentTimeoutTask.update(null).Semantically it preserves the operator's contract: items in the buffer are emitted within
timespan. The residuals effectively get a freshtimespanwindow from the moment the partial flush completed.All 246
FluxBuffer*tests pass with the fix in place, including the existing race-style tests (bufferTimeoutShouldNotRaceWithNext,processesLargeDataset, etc.).Your Environment
3.6.11; Reproduced onmainat3.8.6-SNAPSHOTnetty, ...): Atleon (Kafka receiver) sits upstream ofbufferTimeout, with STRICT-modeAcknowledgementQueueandmaxActiveInFlight=4096. Not strictly required to hit the bug — any upstream that can emit concurrently with the drain thread is enough — but it is what amplified the stall into a hard deadlock.java -version): reproduced onopenjdk 17.0.18(Zulu17.64+17-CA)uname -a):Darwin 25.4.0 arm64. Production runs on Linux.