Skip to content

Commit

Permalink
fix reactor#1125 Add TTL-generator variant to Mono.cache
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored Apr 3, 2018
1 parent 8a399e8 commit d734ad8
Show file tree
Hide file tree
Showing 3 changed files with 402 additions and 9 deletions.
26 changes: 26 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,32 @@ public final Mono<T> cache(Duration ttl) {
return onAssembly(new MonoCacheTime<>(this, ttl, Schedulers.parallel()));
}

/**
* Turn this {@link Mono} into a hot source and cache last emitted signal for further
* {@link Subscriber}, with an expiry timeout (TTL) that depends on said signal.
* <p>
* Empty completion and Error will also be replayed according to their respective TTL.
* <p>
* If the relevant TTL generator throws any {@link Exception}, that exception will be
* propagated to the {@link Subscriber} that encountered the cache miss, but the cache
* will be immediately cleared, so further Subscribers might re-populate the cache in
* case the error was transient. In case the source was emitting an error, that error
* is {@link Hooks#onErrorDropped(Consumer) dropped} and added as a suppressed exception.
* In case the source was emitting a value, that value is {@link Hooks#onNextDropped(Consumer) dropped}.
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
* @param ttlForEmpty the TTL-generating {@link Supplier} invoked when source is empty
* @return
*/
public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
Function<Throwable, Duration> ttlForError,
Supplier<Duration> ttlForEmpty) {
return onAssembly(new MonoCacheTime<>(this,
ttlForValue, ttlForError, ttlForEmpty,
Schedulers.parallel()));
}

/**
* Prepare this {@link Mono} so that subscribers will cancel from it on a
* specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand All @@ -38,8 +41,8 @@ class MonoCacheTime<T> extends MonoOperator<T, T> implements Runnable {

private static final Logger LOGGER = Loggers.getLogger(MonoCacheTime.class);

final Duration ttl;
final Scheduler clock;
final Function<? super Signal<T>, Duration> ttlGenerator;
final Scheduler clock;

volatile Signal<T> state;
static final AtomicReferenceFieldUpdater<MonoCacheTime, Signal> STATE =
Expand All @@ -49,7 +52,32 @@ class MonoCacheTime<T> extends MonoOperator<T, T> implements Runnable {

MonoCacheTime(Mono<? extends T> source, Duration ttl, Scheduler clock) {
super(source);
this.ttl = ttl;
this.ttlGenerator = ignoredSignal -> ttl;
this.clock = clock;
//noinspection unchecked
this.state = (Signal<T>) EMPTY;
}

MonoCacheTime(Mono<? extends T> source, Function<? super Signal<T>, Duration> ttlGenerator,
Scheduler clock) {
super(source);
this.ttlGenerator = ttlGenerator;
this.clock = clock;
//noinspection unchecked
this.state = (Signal<T>) EMPTY;
}

MonoCacheTime(Mono<? extends T> source,
Function<? super T, Duration> valueTtlGenerator,
Function<Throwable, Duration> errorTtlGenerator,
Supplier<Duration> emptyTtlGenerator,
Scheduler clock) {
super(source);
this.ttlGenerator = sig -> {
if (sig.isOnNext()) return valueTtlGenerator.apply(sig.get());
if (sig.isOnError()) return errorTtlGenerator.apply(sig.getThrowable());
return emptyTtlGenerator.get();
};
this.clock = clock;
@SuppressWarnings("unchecked")
Signal<T> emptyState = (Signal<T>) EMPTY;
Expand All @@ -71,10 +99,10 @@ public void subscribe(CoreSubscriber<? super T> actual) {
//init or expired
CoordinatorSubscriber<T> newState = new CoordinatorSubscriber<>(this);
if (STATE.compareAndSet(this, EMPTY, newState)) {
source.subscribe(newState);
CacheMonoSubscriber<T> inner = new CacheMonoSubscriber<>(actual, newState);
if (newState.add(inner)) {
actual.onSubscribe(inner);
source.subscribe(newState);
break;
}
}
Expand Down Expand Up @@ -233,16 +261,43 @@ public void onSubscribe(Subscription s) {

@SuppressWarnings("unchecked")
private void signalCached(Signal<T> signal) {
Signal<T> signalToPropagate = signal;
if (STATE.compareAndSet(main, this, signal)) {
main.clock.schedule(main, main.ttl.toMillis(), TimeUnit.MILLISECONDS);
Duration ttl = null;
try {
ttl = main.ttlGenerator.apply(signal);
}
catch (Throwable generatorError) {
signalToPropagate = Signal.error(generatorError);
STATE.set(main, signalToPropagate);
if (signal.isOnError()) {
//noinspection ThrowableNotThrown
Exceptions.addSuppressed(generatorError, signal.getThrowable());
}
}

if (ttl != null) {
main.clock.schedule(main, ttl.toMillis(), TimeUnit.MILLISECONDS);
}
else {
//error during TTL generation, signal != updatedSignal, aka dropped
if (signal.isOnNext()) {
Operators.onNextDropped(signal.get(), currentContext());
}
else if (signal.isOnError()) {
Operators.onErrorDropped(signal.getThrowable(), currentContext());
}
//immediate cache clear
main.run();
}
}

for (Operators.MonoSubscriber<T, T> inner : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
if (signal.isOnNext()) {
inner.complete(signal.get());
if (signalToPropagate.isOnNext()) {
inner.complete(signalToPropagate.get());
}
else if (signal.isOnError()) {
inner.onError(signal.getThrowable());
else if (signalToPropagate.isOnError()) {
inner.onError(signalToPropagate.getThrowable());
}
else {
inner.onComplete();
Expand Down
Loading

0 comments on commit d734ad8

Please sign in to comment.