Skip to content

Commit

Permalink
[resolve reactor#723] Add [Mono|Flux|Synchronous]Sink#currentContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini authored and smaldini committed Jul 15, 2017
1 parent 9ce4638 commit 458c32f
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ final class DelegateProcessor<IN, OUT> extends FluxProcessor<IN, OUT> {

@Override
public Context currentContext() {
return Operators.context(upstream);
if(upstream instanceof CoreSubscriber){
return ((CoreSubscriber)upstream).currentContext();
}
return Context.empty();
}

@Override
Expand Down
16 changes: 16 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import reactor.core.Scannable;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/**
* Provides a multi-valued sink API for a callback that is called for
Expand Down Expand Up @@ -124,6 +125,11 @@ static final class SerializedSink<T> implements FluxSink<T>, Scannable{
this.queue = Queues.<T>unbounded(16).get();
}

@Override
public Context currentContext() {
return sink.currentContext();
}

@Override
public FluxSink<T> next(T t) {
if (sink.isCancelled() || done) {
Expand Down Expand Up @@ -294,6 +300,11 @@ static class SerializeOnRequestSink<T> implements FluxSink<T>, Scannable {
this.sink = sink;
}

@Override
public Context currentContext() {
return sink.currentContext();
}

@Override
public Object scanUnsafe(Attr key) {
return serializedSink != null ? serializedSink.scanUnsafe(key) : baseSink.scanUnsafe(key);
Expand Down Expand Up @@ -374,6 +385,11 @@ static abstract class BaseSink<T>
this.actual = actual;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void complete() {
if (isCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.context.Context;

/**
* Generate signals one-by-one via a function callback.
Expand Down Expand Up @@ -120,6 +121,11 @@ static final class GenerateSubscription<T, S>
this.stateConsumer = stateConsumer;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
Expand Down
11 changes: 11 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/FluxHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.context.Context;

/**
* Maps the values of the source publisher one-on-one via a handler function as long as the handler function result is
Expand Down Expand Up @@ -80,6 +81,11 @@ public void onSubscribe(Subscription s) {
}
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void onNext(T t) {
if (done) {
Expand Down Expand Up @@ -228,6 +234,11 @@ static final class HandleConditionalSubscriber<T, R>
this.handler = handler;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.context.Context;

/**
* Maps the values of the source publisher one-on-one via a handler function.
Expand Down Expand Up @@ -87,6 +88,11 @@ static final class HandleFuseableSubscriber<T, R>
this.handler = handler;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public boolean tryOnNext(T t) {
if (done) {
Expand Down Expand Up @@ -345,6 +351,11 @@ static final class HandleFuseableConditionalSubscriber<T, R>
this.handler = handler;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Subscription s) {
Expand Down
20 changes: 16 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/FluxSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.LongConsumer;

import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.util.context.Context;

Expand All @@ -36,6 +37,17 @@ public interface FluxSink<T> {
*/
void complete();

/**
* Return the current subscriber {@link Context}.
* <p>
* {@link Context} can be enriched via {@link Flux#contextStart(Function)}
* operator or directly by a child subscriber overriding
* {@link CoreSubscriber#currentContext()}
*
* @return the current subscriber {@link Context}.
*/
Context currentContext();

/**
* @see Subscriber#onError(Throwable)
* @param e the exception to signal, not null
Expand Down Expand Up @@ -65,16 +77,16 @@ public interface FluxSink<T> {
* Attaches a {@link LongConsumer} to this {@link FluxSink} that will be notified of
* any request to this sink.
* <p>
* For push/pull sinks created using {@link Flux#create(Consumer)}
* or {@link Flux#create(Consumer, FluxSink.OverflowStrategy)},
* For push/pull sinks created using {@link Flux#create(java.util.function.Consumer)}
* or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)},
* the consumer
* is invoked for every request to enable a hybrid backpressure-enabled push/pull model.
* When bridging with asynchronous listener-based APIs, the {@code onRequest} callback
* may be used to request more data from source if required and to manage backpressure
* by delivering data to sink only when requests are pending.
* <p>
* For push-only sinks created using {@link Flux#push(Consumer)}
* or {@link Flux#push(Consumer, FluxSink.OverflowStrategy)},
* For push-only sinks created using {@link Flux#push(java.util.function.Consumer)}
* or {@link Flux#push(java.util.function.Consumer, FluxSink.OverflowStrategy)},
* the consumer is invoked with an initial request of {@code Long.MAX_VALUE} when this method
* is invoked.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.context.Context;

/**
* Wraps a the downstream Subscriber into a single emission object
Expand Down Expand Up @@ -85,6 +86,11 @@ static final class DefaultMonoSink<T> extends AtomicBoolean
this.actual = actual;
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
Expand Down
20 changes: 15 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/MonoSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import javax.annotation.Nullable;

import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.util.context.Context;
import javax.annotation.Nullable;

/**
* Wrapper API around an actual downstream Subscriber
Expand All @@ -34,9 +34,19 @@
public interface MonoSink<T> {

/**
* Complete without any value.
* <p>Calling this method multiple times or after the other
* terminating methods has no effect.
* Return the current subscriber {@link Context}.
* <p>
* {@link Context} can be enriched via {@link Mono#contextStart(Function)}
* operator or directly by a child subscriber overriding
* {@link CoreSubscriber#currentContext()}
*
* @return the current subscriber {@link Context}.
*/
Context currentContext();

/**
* Complete without any value. <p>Calling this method multiple times or after the
* other terminating methods has no effect.
*/
void success();

Expand Down
16 changes: 0 additions & 16 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,6 @@ public static void complete(Subscriber<?> s) {
s.onComplete();
}

/**
* Cast a reference to {@link CoreSubscriber} and read its {@link
* CoreSubscriber#currentContext()} or fallback to {@link Context#empty()}
*
* @param s reference to cast
*
* @return current subscriber context or empty
*/
@SuppressWarnings("unchecked")
public static Context context(Object s){
if(s instanceof CoreSubscriber){
return ((CoreSubscriber)s).currentContext();
}
return Context.empty();
}

/**
* Return a singleton {@link Subscriber} that does not check for double onSubscribe
* and purely request Long.MAX. If an error is received it will raise a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,43 @@

package reactor.core.publisher;

import java.util.function.Function;

import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

/**
* Interface to generate signals to a bridged {@link Subscriber}.
* Interface to produce synchronously "one signal" to an underlying {@link Subscriber}.
* <p>
* At most one {@link #next} call and/or one {@link #complete()} or
* {@link #error(Throwable)} should be called per invocation of the generator function.
*
* <p>
* At most one {@link #next} call and/or one {@link #complete()} or {@link
* #error(Throwable)} should be called per invocation of the generator function
* Calling a {@link SynchronousSink} outside of a generator consumer or function, e.g.
* using an async callback, is forbidden. You can {@link FluxSink} or
* {@link MonoSink} based generators for these situations.
*
* @param <T> the output value type
*/
public interface SynchronousSink<T> {

/**
* @see Subscriber#onComplete()
*/
void complete();

/**
* Return the current subscriber {@link Context}.
* <p>
* {@link Context} can be enriched via {@link Mono#contextStart(Function)}
* or {@link Flux#contextStart(Function)}
* operator or directly by a child subscriber overriding
* {@link CoreSubscriber#currentContext()}
*
* @return the current subscriber {@link Context}.
*/
Context currentContext();

/**
* @param e the exception to signal, not null
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -1162,4 +1163,31 @@ public void scanSerializedSink() {
assertThat(test.scan(Scannable.ThrowableAttr.ERROR)).hasMessage("boom");

}


@Test
public void contextTest() {
StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i -> s.next(s
.currentContext()
.get(AtomicInteger.class)
.incrementAndGet())))
.take(10)
.contextStart(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}

@Test
public void contextTestPush() {
StepVerifier.create(Flux.push(s -> IntStream.range(0, 10).forEach(i -> s.next(s
.currentContext()
.get(AtomicInteger.class)
.incrementAndGet())))
.take(10)
.contextStart(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -392,4 +393,16 @@ public void scanSubscriptionCancelled() {
assertThat(test.scan(Scannable.BooleanAttr.CANCELLED)).isTrue();
}

@Test
public void contextTest() {
StepVerifier.create(Flux.generate(s -> s.next(s.currentContext()
.get(AtomicInteger.class)
.incrementAndGet()))
.take(10)
.contextStart(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}

}
Loading

0 comments on commit 458c32f

Please sign in to comment.