Skip to content

Commit

Permalink
fix reactor#1033 Remove final empty window from windowUntil/While
Browse files Browse the repository at this point in the history
Offer and drain in all situations, even those where no emission.

This results in subsequent separators or sequences that start with a
separator to emit an empty window, but avoids a remainder window on
source completion, when said window is empty.
  • Loading branch information
simonbasle authored and smaldini committed May 8, 2018
1 parent ed6a34b commit edfdd9e
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 66 deletions.
43 changes: 29 additions & 14 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -8128,6 +8128,11 @@ public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan, Schedul
* Split this {@link Flux} sequence into multiple {@link Flux} windows delimited by the
* given predicate. A new window is opened each time the predicate returns true, at which
* point the previous window will receive the triggering element then onComplete.
* <p>
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window errors). This variant shouldn't
* expose empty windows, as the separators are emitted into
* the windows they close.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowuntil.png" alt="">
Expand All @@ -8144,15 +8149,18 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger) {
* Split this {@link Flux} sequence into multiple {@link Flux} windows delimited by the
* given predicate. A new window is opened each time the predicate returns true.
* <p>
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window completes or errors).
* <p>
* If {@code cutBefore} is true, the old window will onComplete and the triggering
* element will be emitted in the new window. Note it can mean that an empty window is
* sometimes emitted, eg. if the first element in the sequence immediately matches the
* predicate.
* element will be emitted in the new window, which becomes immediately available.
* This variant can emit an empty window if the sequence starts with a separator.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowuntilcutbefore.png" alt="">
* <p>
* Otherwise, the triggering element will be emitted in the old window before it does
* onComplete, similar to {@link #windowUntil(Predicate)}.
* onComplete, similar to {@link #windowUntil(Predicate)}. This variant shouldn't
* expose empty windows, as the separators are emitted into the windows they close.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowuntilcutafter.png" alt="">
*
Expand All @@ -8170,15 +8178,18 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cut
* predicate and using a prefetch. A new window is opened each time the predicate
* returns true.
* <p>
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window completes or errors).
* <p>
* If {@code cutBefore} is true, the old window will onComplete and the triggering
* element will be emitted in the new window. Note it can mean that an empty window is
* sometimes emitted, eg. if the first element in the sequence immediately matches the
* predicate.
* element will be emitted in the new window. This variant can emit an empty window
* if the sequence starts with a separator.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowuntilcutbefore.png" alt="">
* <p>
* Otherwise, the triggering element will be emitted in the old window before it does
* onComplete, similar to {@link #windowUntil(Predicate)}.
* onComplete, similar to {@link #windowUntil(Predicate)}. This variant shouldn't
* expose empty windows, as the separators are emitted into the windows they close.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowuntilcutafter.png" alt="">
*
Expand All @@ -8202,9 +8213,11 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cut
* while a given predicate matches the source elements. Once the predicate returns
* false, the window closes with an onComplete and the triggering element is discarded.
* <p>
* Note that for a sequence starting with a separator, or having several subsequent
* separators anywhere in the sequence, each occurrence will lead to an empty window.
*
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window completes or errors). Empty windows
* can happen when a sequence starts with a separator or contains multiple separators,
* but a sequence that finishes with a separator won't cause a remainder empty window
* to be emitted.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowwhile.png" alt="">
*
Expand All @@ -8221,9 +8234,11 @@ public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate) {
* while a given predicate matches the source elements. Once the predicate returns
* false, the window closes with an onComplete and the triggering element is discarded.
* <p>
* Note that for a sequence starting with a separator, or having several subsequent
* separators anywhere in the sequence, each occurrence will lead to an empty window.
*
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window completes or errors). Empty windows
* can happen when a sequence starts with a separator or contains multiple separators,
* but a sequence that finishes with a separator won't cause a remainder empty window
* to be emitted.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/windowwhile.png" alt="">
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,9 +39,11 @@
* a {@link Predicate} on the values. The predicate can be used in several modes:
* <ul>
* <li>{@code Until}: A new window starts when the predicate returns true. The
* element that just matched the predicate is the last in the previous window.</li>
* element that just matched the predicate is the last in the previous window, and the
* windows are not emitted before an inner element is pushed.</li>
* <li>{@code UntilOther}: A new window starts when the predicate returns true. The
* element that just matched the predicate is the first in the new window.</li>
* element that just matched the predicate is the first in the new window, which is
* emitted immediately.</li>
* <li>{@code While}: A new window starts when the predicate stops matching. The
* non-matching elements that delimit each window are simply discarded, and the
* windows are not emitted before an inner element is pushed</li>
Expand Down Expand Up @@ -181,28 +183,18 @@ void initializeWindow() {
groupQueueSupplier.get(),
this);
window = g;
queue.offer(g);
}

void offerNewWindow(@Nullable T emitInNewWindow) {
@Nullable WindowFlux<T> newWindowDeferred() {
// if the main is cancelled, don't create new groups
if (cancelled == 0) {
WINDOW_COUNT.getAndIncrement(this);

WindowFlux<T> g = new WindowFlux<>(
groupQueueSupplier.get(), this);
if (emitInNewWindow != null) {
g.onNext(emitInNewWindow);
}
WindowFlux<T> g = new WindowFlux<>(groupQueueSupplier.get(), this);
window = g;

if (!queue.offer(g)) {
onError(Operators.onOperatorError(this, Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), emitInNewWindow,
actual.currentContext()));
return;
}
drain();
return g;
}
return null;
}

@Override
Expand All @@ -222,18 +214,29 @@ public void onNext(T t) {
return;
}


if (!handleDeferredWindow(g, t)) {
return;
}
drain();

if (mode == Mode.UNTIL && match) {
g.onNext(t);
g.onComplete();
offerNewWindow(null);
newWindowDeferred();
}
else if (mode == Mode.UNTIL_CUT_BEFORE && match) {
g.onComplete();
offerNewWindow(t);
g = newWindowDeferred();
if (g != null) {
g.onNext(t);
handleDeferredWindow(g, t);
drain();
}
}
else if (mode == Mode.WHILE && !match) {
g.onComplete();
offerNewWindow(null);
newWindowDeferred();
//compensate for the dropped delimiter
s.request(1);
}
Expand All @@ -242,6 +245,20 @@ else if (mode == Mode.WHILE && !match) {
}
}

boolean handleDeferredWindow(@Nullable WindowFlux<T> window, T signal) {
if (window != null && window.deferred) {
window.deferred = false;
if (!queue.offer(window)) {
onError(Operators.onOperatorError(this,
Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL),
signal,
actual.currentContext()));
return false;
}
}
return true;
}

@Override
public void onError(Throwable t) {
if (Exceptions.addThrowable(ERROR, this, t)) {
Expand All @@ -255,7 +272,7 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if(done){
if(done) {
return;
}

Expand Down Expand Up @@ -545,11 +562,14 @@ static final class WindowFlux<T> extends Flux<T>

int produced;

boolean deferred;

WindowFlux(
Queue<T> queue,
WindowPredicateMain<T> parent) {
this.queue = queue;
this.parent = parent;
this.deferred = true;
}

@Override
Expand Down
Loading

0 comments on commit edfdd9e

Please sign in to comment.