Skip to content

Commit db8902d

Browse files
author
Dariusz Jędrzejczyk
authored
Replay terminal signals to late subscribers in Flux.replay(int) and Flux.cache(int) (#3200)
Supporting the caching of only terminal signals in case of Flux.replay(int) and Flux.cache(int) operators. As it currently stands, these operators, when provided 0 as the argument, resort to behaving like Flux.publish() which differs in the way termination signals are handled by the > 0 cases. This change still uses the FluxPublish class to implement the logic, however with a few minor changes to its implementation. First of all, FluxPublish resets itself after source termination to be able to connect() again. FluxReplay on the other hand, does not reset itself in the case of only buffering signals without timeout. Therefore, for the behaviour of caching the terminals, FluxPublish does not reset itself and replays the terminal when it receives a late subscription. For cases with expiry (TTL arguments), FluxReplay does indeed reset itself. Therefore, the behaviour for the time constrained values remains as before, using FluxPublish implementation for the 0 history case, but without caching terminals, while not honouring the TTL. This case can be later implemented if needed. Fixes #3164
1 parent c85d2bd commit db8902d

File tree

4 files changed

+342
-96
lines changed

4 files changed

+342
-96
lines changed

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7219,7 +7219,7 @@ public final ConnectableFlux<T> publish() {
72197219
*/
72207220
public final ConnectableFlux<T> publish(int prefetch) {
72217221
return onAssembly(new FluxPublish<>(this, prefetch, Queues
7222-
.get(prefetch)));
7222+
.get(prefetch), true));
72237223
}
72247224

72257225
/**
@@ -7565,10 +7565,12 @@ public final ConnectableFlux<T> replay() {
75657565
* Will retain up to the given history size onNext signals. Completion and Error will also be
75667566
* replayed.
75677567
* <p>
7568-
* Note that {@code cache(0)} will only cache the terminal signal without
7568+
* Note that {@code replay(0)} will only cache the terminal signal without
75697569
* expiration.
75707570
*
75717571
* <p>
7572+
* Re-connects are not supported.
7573+
* <p>
75727574
* <img class="marble" src="doc-files/marbles/replayWithHistory.svg" alt="">
75737575
*
75747576
* @param history number of events retained in history excluding complete and
@@ -7579,8 +7581,8 @@ public final ConnectableFlux<T> replay() {
75797581
*/
75807582
public final ConnectableFlux<T> replay(int history) {
75817583
if (history == 0) {
7582-
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
7583-
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
7584+
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE,
7585+
Queues.get(Queues.SMALL_BUFFER_SIZE), false));
75847586
}
75857587
return onAssembly(new FluxReplay<>(this, history, 0L, null));
75867588
}
@@ -7662,8 +7664,8 @@ public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) {
76627664
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer) {
76637665
Objects.requireNonNull(timer, "timer");
76647666
if (history == 0) {
7665-
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
7666-
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
7667+
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE,
7668+
Queues.get(Queues.SMALL_BUFFER_SIZE), true));
76677669
}
76687670
return onAssembly(new FluxReplay<>(this, history, ttl.toNanos(), timer));
76697671
}
@@ -7986,8 +7988,10 @@ public final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T,
79867988
* to subscribe once, late subscribers might therefore miss items.
79877989
*/
79887990
public final Flux<T> share() {
7989-
return onAssembly(new FluxRefCount<>(
7990-
new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.small()), 1)
7991+
return onAssembly(
7992+
new FluxRefCount<>(new FluxPublish<>(
7993+
this, Queues.SMALL_BUFFER_SIZE, Queues.small(), true
7994+
), 1)
79917995
);
79927996
}
79937997

reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,11 @@ final class FluxPublish<T> extends ConnectableFlux<T> implements Scannable {
5757

5858
final Supplier<? extends Queue<T>> queueSupplier;
5959

60+
/**
61+
* Whether to prepare for a reconnect after the source terminates.
62+
*/
63+
final boolean resetUponSourceTermination;
64+
6065
volatile PublishSubscriber<T> connection;
6166
@SuppressWarnings("rawtypes")
6267
static final AtomicReferenceFieldUpdater<FluxPublish, PublishSubscriber> CONNECTION =
@@ -66,13 +71,15 @@ final class FluxPublish<T> extends ConnectableFlux<T> implements Scannable {
6671

6772
FluxPublish(Flux<? extends T> source,
6873
int prefetch,
69-
Supplier<? extends Queue<T>> queueSupplier) {
74+
Supplier<? extends Queue<T>> queueSupplier,
75+
boolean resetUponSourceTermination) {
7076
if (prefetch <= 0) {
7177
throw new IllegalArgumentException("bufferSize > 0 required but it was " + prefetch);
7278
}
7379
this.source = Objects.requireNonNull(source, "source");
7480
this.prefetch = prefetch;
7581
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
82+
this.resetUponSourceTermination = resetUponSourceTermination;
7683
}
7784

7885
@Override
@@ -111,7 +118,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
111118
}
112119

113120
PublishSubscriber<T> c = connection;
114-
if (c == null || c.isTerminated()) {
121+
if (c == null || (this.resetUponSourceTermination && c.isTerminated())) {
115122
PublishSubscriber<T> u = new PublishSubscriber<>(prefetch, this);
116123
if (!CONNECTION.compareAndSet(this, c, u)) {
117124
continue;
@@ -123,12 +130,18 @@ public void subscribe(CoreSubscriber<? super T> actual) {
123130
if (c.add(inner)) {
124131
if (inner.isCancelled()) {
125132
c.remove(inner);
126-
}
127-
else {
133+
} else {
128134
inner.parent = c;
129135
}
130136
c.drain();
131137
break;
138+
} else if (!this.resetUponSourceTermination) {
139+
if (c.error != null) {
140+
inner.actual.onError(c.error);
141+
} else {
142+
inner.actual.onComplete();
143+
}
144+
break;
132145
}
133146
}
134147
}
@@ -515,16 +528,20 @@ boolean checkTerminated(boolean d, boolean empty) {
515528
if (d) {
516529
Throwable e = error;
517530
if (e != null && e != Exceptions.TERMINATED) {
518-
CONNECTION.compareAndSet(parent, this, null);
519-
e = Exceptions.terminate(ERROR, this);
531+
if (parent.resetUponSourceTermination) {
532+
CONNECTION.compareAndSet(parent, this, null);
533+
e = Exceptions.terminate(ERROR, this);
534+
}
520535
queue.clear();
521536
for (PubSubInner<T> inner : terminate()) {
522537
inner.actual.onError(e);
523538
}
524539
return true;
525540
}
526541
else if (empty) {
527-
CONNECTION.compareAndSet(parent, this, null);
542+
if (parent.resetUponSourceTermination) {
543+
CONNECTION.compareAndSet(parent, this, null);
544+
}
528545
for (PubSubInner<T> inner : terminate()) {
529546
inner.actual.onComplete();
530547
}

reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@ public void cacheFluxHistoryTTL() {
103103
}
104104

105105
@Test
106-
public void cacheFluxTTL2() {
106+
public void cacheFluxTTLReconnectsAfterTTL() {
107107
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
108108

109109
AtomicInteger i = new AtomicInteger(0);
@@ -125,6 +125,47 @@ public void cacheFluxTTL2() {
125125
.verifyComplete();
126126
}
127127

128+
@Test
129+
void cacheZeroFluxCachesCompletion() {
130+
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
131+
132+
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
133+
.delayElements(Duration.ofMillis(1000)
134+
, vts)
135+
.cache(0)
136+
.elapsed(vts);
137+
138+
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
139+
.thenAwait(Duration.ofSeconds(3))
140+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
141+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
142+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
143+
.verifyComplete();
144+
145+
StepVerifier.create(source).verifyComplete();
146+
}
147+
148+
@Test
149+
public void cacheZeroFluxTTLReconnectsAfterSourceCompletion() {
150+
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
151+
152+
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
153+
.delayElements(
154+
Duration.ofMillis(1000), vts
155+
)
156+
.cache(0, Duration.ofMillis(2000), vts)
157+
.elapsed(vts);
158+
159+
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
160+
.thenAwait(Duration.ofSeconds(3))
161+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
162+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
163+
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
164+
.verifyComplete();
165+
166+
StepVerifier.create(source).expectTimeout(Duration.ofMillis(500)).verify();
167+
}
168+
128169
@Test
129170
public void cacheContextHistory() {
130171
AtomicInteger contextFillCount = new AtomicInteger();
@@ -156,6 +197,38 @@ public void cacheContextHistory() {
156197
assertThat(contextFillCount).as("cacheHit3").hasValue(4);
157198
}
158199

200+
@Test
201+
public void cacheZeroContext() {
202+
AtomicInteger contextFillCount = new AtomicInteger();
203+
Flux<String> cached = Flux.just(1, 2)
204+
.flatMap(i -> Mono.deferContextual(Mono::just)
205+
.map(ctx -> ctx.getOrDefault("a", "BAD"))
206+
)
207+
.cache(0)
208+
.contextWrite(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));
209+
210+
// at first pass, the Context is propagated to subscriber, but not cached
211+
String cacheMiss = cached.blockLast();
212+
assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
213+
assertThat(contextFillCount).as("cacheMiss").hasValue(1);
214+
215+
// at second subscribe, the Context fill attempt is still done, but ultimately
216+
// ignored since source terminated
217+
String zeroCache = cached.blockLast();
218+
assertThat(zeroCache).as("zeroCache").isNull(); //value from the cache
219+
assertThat(contextFillCount).as("zeroCache").hasValue(2); //function was still invoked
220+
221+
//at third subscribe, function is called for the 3rd time, but the context is still cached
222+
String zeroCache2 = cached.blockLast();
223+
assertThat(zeroCache2).as("zeroCache2").isNull();
224+
assertThat(contextFillCount).as("zeroCache2").hasValue(3);
225+
226+
//at fourth subscribe, function is called for the 4th time, but the context is still cached
227+
String zeroCache3 = cached.blockLast();
228+
assertThat(zeroCache3).as("zeroCache3").isNull();
229+
assertThat(contextFillCount).as("zeroCache3").hasValue(4);
230+
}
231+
159232
@Test
160233
public void cacheContextTime() {
161234
AtomicInteger contextFillCount = new AtomicInteger();

0 commit comments

Comments
 (0)