Skip to content

Commit

Permalink
fix reactor#1557 Removed greediness from FluxBufferTimeout
Browse files Browse the repository at this point in the history
FluxBufferTimeout now only allows for N * R outstanding requests
upstream, where N is the buffer size and R is the number of buffers
requested from downstream, even when a buffer is cut due to a timeout.

It tracks the number of outstanding unfulfilled requests to upstream,
and only requests further items to bring it up to a cap of requested *
batchSize.

Reviewed-In: reactor#1907
  • Loading branch information
EleanorRoseLegg authored and simonbasle committed Oct 30, 2019
1 parent f343f3c commit 74b9bec
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,6 +109,12 @@ final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
static final AtomicLongFieldUpdater<BufferTimeoutSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(BufferTimeoutSubscriber.class, "requested");

volatile long outstanding;

@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<BufferTimeoutSubscriber> OUTSTANDING =
AtomicLongFieldUpdater.newUpdater(BufferTimeoutSubscriber.class, "outstanding");

volatile int index = 0;

static final AtomicIntegerFieldUpdater<BufferTimeoutSubscriber> INDEX =
Expand Down Expand Up @@ -155,6 +161,15 @@ protected void doOnSubscribe() {

void nextCallback(T value) {
synchronized (this) {
if (OUTSTANDING.decrementAndGet(this) < 0)
{
actual.onError(Exceptions.failWithOverflow("Unrequested element received"));
Context ctx = actual.currentContext();
Operators.onDiscard(value, ctx);
Operators.onDiscardMultiple(values, ctx);
return;
}

C v = values;
if(v == null) {
v = Objects.requireNonNull(bufferSupplier.get(),
Expand Down Expand Up @@ -289,14 +304,16 @@ public void request(long n) {
requestMore(Long.MAX_VALUE);
}
else {
requestMore(Operators.multiplyCap(n, batchSize));
long requestLimit = Operators.multiplyCap(requested, batchSize);
requestMore(requestLimit - outstanding);
}
}
}

final void requestMore(long n) {
Subscription s = this.subscription;
if (s != null) {
Operators.addCap(OUTSTANDING, this, n);
s.request(n);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.junit.After;
Expand Down Expand Up @@ -202,6 +204,62 @@ public void downstreamDemandShouldBeAbleToDecreaseOnTimeSpan() {
assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(0L);
}

@Test
public void requestedFromUpstreamShouldNotExceedDownstreamDemand() {
EmitterProcessor<String> emitter = EmitterProcessor.create(1);
FluxSink<String> sink = emitter.sink();

AtomicLong requestedOutstanding = new AtomicLong(0);

VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();

Flux<List<String>> flux = emitter.doOnRequest(requestedOutstanding::addAndGet)
.bufferTimeout(5, Duration.ofMillis(100), scheduler)
.doOnNext(list -> requestedOutstanding.addAndGet(0 - list.size()));

StepVerifier.withVirtualTime(() -> flux, () -> scheduler, 0)
.expectSubscription()
.then(() -> assertThat(requestedOutstanding.get()).isEqualTo(0))
.thenRequest(2)
.then(() -> assertThat(requestedOutstanding.get()).isEqualTo(10))
.then(() -> sink.next("a"))
.thenAwait(Duration.ofMillis(100))
.assertNext(s -> assertThat(s).containsExactly("a"))
.then(() -> assertThat(requestedOutstanding.get()).isEqualTo(9))
.thenRequest(1)
.then(() -> assertThat(requestedOutstanding.get()).isEqualTo(10))
.thenCancel()
.verify();
}

@Test
public void exceedingUpstreamDemandResultsInError() {
Subscription[] subscriptionsHolder = new Subscription[1];

AtomicReference<Throwable> capturedException = new AtomicReference<>();

CoreSubscriber<List<String>> actual = new LambdaSubscriber<>(null, capturedException::set, null, s -> subscriptionsHolder[0] = s);

VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();
FluxBufferTimeout.BufferTimeoutSubscriber<String, List<String>> test = new FluxBufferTimeout.BufferTimeoutSubscriber<String, List<String>>(
actual, 5, 1000, scheduler.createWorker(), ArrayList::new);

Subscription subscription = Operators.emptySubscription();
test.onSubscribe(subscription);
subscriptionsHolder[0].request(1);

for (int i = 0; i < 5; i++) {
test.onNext(String.valueOf(i));
}

assertThat(capturedException.get()).isNull();

test.onNext(String.valueOf(123));

assertThat(capturedException.get()).isInstanceOf(IllegalStateException.class)
.hasMessage("Unrequested element received");
}

@Test
public void scanSubscriberCancelled() {
CoreSubscriber<List<String>>
Expand Down

0 comments on commit 74b9bec

Please sign in to comment.