From 458c32fb97b3830a8cf1e53b50c22428b1f298e2 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 14 Jul 2017 09:51:20 -0700 Subject: [PATCH] [resolve #723] Add [Mono|Flux|Synchronous]Sink#currentContext --- .../core/publisher/DelegateProcessor.java | 5 +- .../reactor/core/publisher/FluxCreate.java | 16 ++++++ .../reactor/core/publisher/FluxGenerate.java | 6 ++ .../reactor/core/publisher/FluxHandle.java | 11 ++++ .../core/publisher/FluxHandleFuseable.java | 11 ++++ .../java/reactor/core/publisher/FluxSink.java | 20 +++++-- .../reactor/core/publisher/MonoCreate.java | 6 ++ .../java/reactor/core/publisher/MonoSink.java | 20 +++++-- .../reactor/core/publisher/Operators.java | 16 ------ .../core/publisher/SynchronousSink.java | 28 ++++++++-- .../core/publisher/FluxCreateTest.java | 28 ++++++++++ .../core/publisher/FluxGenerateTest.java | 13 +++++ .../core/publisher/FluxHandleTest.java | 56 +++++++++++++++++++ .../core/publisher/MonoCreateTest.java | 11 ++++ .../core/publisher/UnicastProcessorTest.java | 14 ++++- 15 files changed, 230 insertions(+), 31 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/DelegateProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/DelegateProcessor.java index aadecbb74e..4b1c3c709b 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/DelegateProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/DelegateProcessor.java @@ -43,7 +43,10 @@ final class DelegateProcessor extends FluxProcessor { @Override public Context currentContext() { - return Operators.context(upstream); + if(upstream instanceof CoreSubscriber){ + return ((CoreSubscriber)upstream).currentContext(); + } + return Context.empty(); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java index b38836b5e2..ff8b09d023 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java @@ -34,6 +34,7 @@ import reactor.core.Scannable; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.util.concurrent.Queues; +import reactor.util.context.Context; /** * Provides a multi-valued sink API for a callback that is called for @@ -124,6 +125,11 @@ static final class SerializedSink implements FluxSink, Scannable{ this.queue = Queues.unbounded(16).get(); } + @Override + public Context currentContext() { + return sink.currentContext(); + } + @Override public FluxSink next(T t) { if (sink.isCancelled() || done) { @@ -294,6 +300,11 @@ static class SerializeOnRequestSink implements FluxSink, Scannable { this.sink = sink; } + @Override + public Context currentContext() { + return sink.currentContext(); + } + @Override public Object scanUnsafe(Attr key) { return serializedSink != null ? serializedSink.scanUnsafe(key) : baseSink.scanUnsafe(key); @@ -374,6 +385,11 @@ static abstract class BaseSink this.actual = actual; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override public void complete() { if (isCancelled()) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxGenerate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxGenerate.java index ee6e8c1706..2a56fbaced 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxGenerate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxGenerate.java @@ -25,6 +25,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Exceptions; import reactor.core.Fuseable; +import reactor.util.context.Context; /** * Generate signals one-by-one via a function callback. @@ -120,6 +121,11 @@ static final class GenerateSubscription this.stateConsumer = stateConsumer; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override @Nullable public Object scanUnsafe(Attr key) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxHandle.java b/reactor-core/src/main/java/reactor/core/publisher/FluxHandle.java index a98ed065ef..9e47627441 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxHandle.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxHandle.java @@ -22,6 +22,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; +import reactor.util.context.Context; /** * Maps the values of the source publisher one-on-one via a handler function as long as the handler function result is @@ -80,6 +81,11 @@ public void onSubscribe(Subscription s) { } } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override public void onNext(T t) { if (done) { @@ -228,6 +234,11 @@ static final class HandleConditionalSubscriber this.handler = handler; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxHandleFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxHandleFuseable.java index e36ec1f4e2..27b6042cda 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxHandleFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxHandleFuseable.java @@ -24,6 +24,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Exceptions; import reactor.core.Fuseable; +import reactor.util.context.Context; /** * Maps the values of the source publisher one-on-one via a handler function. @@ -87,6 +88,11 @@ static final class HandleFuseableSubscriber this.handler = handler; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override public boolean tryOnNext(T t) { if (done) { @@ -345,6 +351,11 @@ static final class HandleFuseableConditionalSubscriber this.handler = handler; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @SuppressWarnings("unchecked") @Override public void onSubscribe(Subscription s) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java index 0280575321..92d299683c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSink.java @@ -20,6 +20,7 @@ import java.util.function.LongConsumer; import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.util.context.Context; @@ -36,6 +37,17 @@ public interface FluxSink { */ void complete(); + /** + * Return the current subscriber {@link Context}. + *

+ * {@link Context} can be enriched via {@link Flux#contextStart(Function)} + * operator or directly by a child subscriber overriding + * {@link CoreSubscriber#currentContext()} + * + * @return the current subscriber {@link Context}. + */ + Context currentContext(); + /** * @see Subscriber#onError(Throwable) * @param e the exception to signal, not null @@ -65,16 +77,16 @@ public interface FluxSink { * Attaches a {@link LongConsumer} to this {@link FluxSink} that will be notified of * any request to this sink. *

- * For push/pull sinks created using {@link Flux#create(Consumer)} - * or {@link Flux#create(Consumer, FluxSink.OverflowStrategy)}, + * For push/pull sinks created using {@link Flux#create(java.util.function.Consumer)} + * or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)}, * the consumer * is invoked for every request to enable a hybrid backpressure-enabled push/pull model. * When bridging with asynchronous listener-based APIs, the {@code onRequest} callback * may be used to request more data from source if required and to manage backpressure * by delivering data to sink only when requests are pending. *

- * For push-only sinks created using {@link Flux#push(Consumer)} - * or {@link Flux#push(Consumer, FluxSink.OverflowStrategy)}, + * For push-only sinks created using {@link Flux#push(java.util.function.Consumer)} + * or {@link Flux#push(java.util.function.Consumer, FluxSink.OverflowStrategy)}, * the consumer is invoked with an initial request of {@code Long.MAX_VALUE} when this method * is invoked. * diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCreate.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCreate.java index b373ddf84f..960b57493d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCreate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCreate.java @@ -25,6 +25,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.publisher.FluxCreate.SinkDisposable; +import reactor.util.context.Context; /** * Wraps a the downstream Subscriber into a single emission object @@ -85,6 +86,11 @@ static final class DefaultMonoSink extends AtomicBoolean this.actual = actual; } + @Override + public Context currentContext() { + return actual.currentContext(); + } + @Override @Nullable public Object scanUnsafe(Attr key) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSink.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSink.java index 0fb7e0af23..0d500c46d6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSink.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSink.java @@ -19,11 +19,11 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; +import javax.annotation.Nullable; -import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.util.context.Context; -import javax.annotation.Nullable; /** * Wrapper API around an actual downstream Subscriber @@ -34,9 +34,19 @@ public interface MonoSink { /** - * Complete without any value. - *

Calling this method multiple times or after the other - * terminating methods has no effect. + * Return the current subscriber {@link Context}. + *

+ * {@link Context} can be enriched via {@link Mono#contextStart(Function)} + * operator or directly by a child subscriber overriding + * {@link CoreSubscriber#currentContext()} + * + * @return the current subscriber {@link Context}. + */ + Context currentContext(); + + /** + * Complete without any value.

Calling this method multiple times or after the + * other terminating methods has no effect. */ void success(); diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 436b3761c3..4e8f35f0a2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -161,22 +161,6 @@ public static void complete(Subscriber s) { s.onComplete(); } - /** - * Cast a reference to {@link CoreSubscriber} and read its {@link - * CoreSubscriber#currentContext()} or fallback to {@link Context#empty()} - * - * @param s reference to cast - * - * @return current subscriber context or empty - */ - @SuppressWarnings("unchecked") - public static Context context(Object s){ - if(s instanceof CoreSubscriber){ - return ((CoreSubscriber)s).currentContext(); - } - return Context.empty(); - } - /** * Return a singleton {@link Subscriber} that does not check for double onSubscribe * and purely request Long.MAX. If an error is received it will raise a diff --git a/reactor-core/src/main/java/reactor/core/publisher/SynchronousSink.java b/reactor-core/src/main/java/reactor/core/publisher/SynchronousSink.java index 4b8135735f..6e665f2ecb 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SynchronousSink.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SynchronousSink.java @@ -16,23 +16,43 @@ package reactor.core.publisher; +import java.util.function.Function; + import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; +import reactor.util.context.Context; /** - * Interface to generate signals to a bridged {@link Subscriber}. + * Interface to produce synchronously "one signal" to an underlying {@link Subscriber}. + *

+ * At most one {@link #next} call and/or one {@link #complete()} or + * {@link #error(Throwable)} should be called per invocation of the generator function. + * *

- * At most one {@link #next} call and/or one {@link #complete()} or {@link - * #error(Throwable)} should be called per invocation of the generator function + * Calling a {@link SynchronousSink} outside of a generator consumer or function, e.g. + * using an async callback, is forbidden. You can {@link FluxSink} or + * {@link MonoSink} based generators for these situations. * * @param the output value type */ public interface SynchronousSink { - /** * @see Subscriber#onComplete() */ void complete(); + /** + * Return the current subscriber {@link Context}. + *

+ * {@link Context} can be enriched via {@link Mono#contextStart(Function)} + * or {@link Flux#contextStart(Function)} + * operator or directly by a child subscriber overriding + * {@link CoreSubscriber#currentContext()} + * + * @return the current subscriber {@link Context}. + */ + Context currentContext(); + /** * @param e the exception to signal, not null * diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java index 40497079b1..d40c19eb56 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.junit.Assert; import org.junit.Test; @@ -1162,4 +1163,31 @@ public void scanSerializedSink() { assertThat(test.scan(Scannable.ThrowableAttr.ERROR)).hasMessage("boom"); } + + + @Test + public void contextTest() { + StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i -> s.next(s + .currentContext() + .get(AtomicInteger.class) + .incrementAndGet()))) + .take(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } + + @Test + public void contextTestPush() { + StepVerifier.create(Flux.push(s -> IntStream.range(0, 10).forEach(i -> s.next(s + .currentContext() + .get(AtomicInteger.class) + .incrementAndGet()))) + .take(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } } \ No newline at end of file diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxGenerateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxGenerateTest.java index 81c89dfc09..5426cf7b21 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxGenerateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxGenerateTest.java @@ -26,6 +26,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; +import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import static org.assertj.core.api.Assertions.assertThat; @@ -392,4 +393,16 @@ public void scanSubscriptionCancelled() { assertThat(test.scan(Scannable.BooleanAttr.CANCELLED)).isTrue(); } + @Test + public void contextTest() { + StepVerifier.create(Flux.generate(s -> s.next(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .take(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } + } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxHandleTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxHandleTest.java index 24c85654b1..165d7c50f1 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxHandleTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxHandleTest.java @@ -20,6 +20,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -453,4 +454,59 @@ public void scanFuseableConditionalSubscriber() { test.onComplete(); assertThat(test.scan(Scannable.BooleanAttr.TERMINATED)).isTrue(); } + + + @Test + public void contextTest() { + StepVerifier.create(Flux.just("foo") + .handle((d, s) -> s.next(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .repeat(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } + + @Test + public void contextTestHide() { + StepVerifier.create(Flux.just("foo") + .hide() + .handle((d, s) -> s.next(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .repeat(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } + + @Test + public void contextTestFilter() { + StepVerifier.create(Flux.just("foo") + .handle((d, s) -> s.next(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .filter(d -> true) + .repeat(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } + @Test + public void contextTestFilterHide() { + StepVerifier.create(Flux.just("foo") + .handle((d, s) -> s.next(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .filter(d -> true) + .repeat(10) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .verifyComplete(); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCreateTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCreateTest.java index ff56d198c4..d19639d255 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCreateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCreateTest.java @@ -312,5 +312,16 @@ public void scanDefaultMonoSinkCancelTerminates() { assertThat(test.scan(Scannable.BooleanAttr.CANCELLED)).isTrue(); assertThat(test.scan(Scannable.BooleanAttr.TERMINATED)).isTrue(); } + + @Test + public void contextTest() { + StepVerifier.create(Mono.create(s -> s.success(s.currentContext() + .get(AtomicInteger.class) + .incrementAndGet())) + .contextStart(ctx -> ctx.put(AtomicInteger.class, + new AtomicInteger()))) + .expectNext(1) + .verifyComplete(); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorTest.java index 72026d3976..076fa4ff31 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorTest.java @@ -21,7 +21,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.IntStream; import javax.annotation.Nullable; @@ -111,7 +113,7 @@ public void assertProcessor(UnicastProcessor processor, @Nullable Consumer onOverflow, @Nullable Disposable onTerminate) { Queue expectedQueue = queue != null ? queue : Queues.unbounded().get(); - Disposable expectedOnTerminate = onTerminate != null ? onTerminate : null; + Disposable expectedOnTerminate = onTerminate; assertEquals(expectedQueue.getClass(), processor.queue.getClass()); assertEquals(expectedOnTerminate, processor.onTerminate); if (onOverflow != null) @@ -169,4 +171,14 @@ public void bufferSizeOtherQueue() { .isEqualTo(Integer.MIN_VALUE) .isEqualTo(Queues.CAPACITY_UNSURE); } + + + @Test + public void contextTest() { + UnicastProcessor p = UnicastProcessor.create(); + p.contextStart(ctx -> ctx.put("foo", "bar")).subscribe(); + + assertThat(p.sink().currentContext().get("foo").toString()).isEqualTo("bar"); + + } }