Skip to content

Commit

Permalink
All operators now implement basic Scannable (reactor#1136, root of re…
Browse files Browse the repository at this point in the history
…actor#1105)

Flux and Mono themselves don't implement Scannable because it would
make the `scan` operator ambiguous. However, all concrete Flux and
Mono operator implementations _should_ implement Scannable, even if
very basic (`return null;`), so that things like hooks and lift
don't hide the Publisher behind a Scannable.UNAVAILABLE_SCAN.

Also improves Hooks javadoc with regards to lift restrictions.
  • Loading branch information
simonbasle authored Mar 21, 2018
1 parent 5d1372b commit 9dac17d
Show file tree
Hide file tree
Showing 53 changed files with 423 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;

/**
Expand Down Expand Up @@ -55,7 +56,7 @@ public static <T> Flux<T> flowPublisherToFlux(Flow.Publisher<T> publisher) {
return new FlowPublisherAsFlux<>(publisher);
}

private static class FlowPublisherAsFlux<T> extends Flux<T> {
private static class FlowPublisherAsFlux<T> extends Flux<T> implements Scannable {
private final java.util.concurrent.Flow.Publisher<T> pub;

private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<T> pub) {
Expand All @@ -66,6 +67,11 @@ private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<T> pub) {
public void subscribe(final CoreSubscriber<? super T> actual) {
pub.subscribe(new SubscriberToRS<>(actual));
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}
}

private static class PublisherAsFlowPublisher<T> implements Flow.Publisher<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
Expand All @@ -31,7 +32,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxArray<T> extends Flux<T> implements Fuseable {
final class FluxArray<T> extends Flux<T> implements Fuseable, Scannable {

final T[] array;

Expand Down Expand Up @@ -59,6 +60,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
subscribe(actual, array);
}


@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
return null;
}

static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
* For each subscriber, a Supplier is invoked and the returned value emitted.
*
* @param <T> the value type;
*/
final class FluxCallable<T> extends Flux<T> implements Callable<T>, Fuseable {
final class FluxCallable<T> extends Flux<T>
implements Callable<T>, Fuseable, Scannable {

final Callable<T> callable;

Expand Down Expand Up @@ -59,4 +61,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public T call() throws Exception {
return callable.call();
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxCombineLatest<T, R> extends Flux<R> implements Fuseable {
final class FluxCombineLatest<T, R> extends Flux<R> implements Fuseable, Scannable {

final Publisher<? extends T>[] array;

Expand Down Expand Up @@ -176,6 +176,12 @@ public void subscribe(CoreSubscriber<? super R> actual) {
coordinator.subscribe(a, n);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return prefetch;
return null;
}

static final class CombineLatestCoordinator<T, R>
implements QueueSubscription<R>, InnerProducer<R> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
Expand All @@ -30,7 +31,7 @@
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxConcatArray<T> extends Flux<T> {
final class FluxConcatArray<T> extends Flux<T> implements Scannable {

final Publisher<? extends T>[] array;

Expand Down Expand Up @@ -81,6 +82,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
}


@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) return delayError;
return null;
}

/**
* Returns a new instance which has the additional source to be merged together with
* the current array of sources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;

/**
* Concatenates a fixed array of Publishers' values.
Expand All @@ -30,7 +31,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxConcatIterable<T> extends Flux<T> {
final class FluxConcatIterable<T> extends Flux<T> implements Scannable {

final Iterable<? extends Publisher<? extends T>> iterable;

Expand Down Expand Up @@ -61,6 +62,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

static final class ConcatIterableSubscriber<T>
extends Operators.MultiSubscriptionSubscriber<T, T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* @param <T> the value type
*/
final class FluxCreate<T> extends Flux<T> {
final class FluxCreate<T> extends Flux<T> implements Scannable {

enum CreateMode {
PUSH_ONLY, PUSH_PULL
Expand Down Expand Up @@ -99,6 +99,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

/**
* Serializes calls to onNext, onError and onComplete.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;

/**
* Defers the creation of the actual Publisher the Subscriber will be subscribed to.
Expand All @@ -29,7 +30,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxDefer<T> extends Flux<T> {
final class FluxDefer<T> extends Flux<T> implements Scannable {

final Supplier<? extends Publisher<? extends T>> supplier;

Expand All @@ -53,4 +54,10 @@ public void subscribe(CoreSubscriber<? super T> actual) {

from(p).subscribe(actual);
}


@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}
}
11 changes: 8 additions & 3 deletions reactor-core/src/main/java/reactor/core/publisher/FluxEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
Expand All @@ -27,9 +28,8 @@
* Use the {@link #instance()} method to obtain a properly type-parametrized view of it.
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxEmpty
extends Flux<Object>
implements Fuseable.ScalarCallable<Object> {
final class FluxEmpty extends Flux<Object>
implements Fuseable.ScalarCallable<Object>, Scannable {

private static final Flux<Object> INSTANCE = new FluxEmpty();

Expand All @@ -42,6 +42,11 @@ public void subscribe(CoreSubscriber<? super Object> actual) {
Operators.complete(actual);
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

/**
* Returns a properly parametrized instance of this empty Publisher.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import reactor.core.Fuseable;

import reactor.core.CoreSubscriber;
import reactor.core.Scannable;

/**
* Emits a constant or generated Throwable instance to Subscribers.
Expand All @@ -30,7 +31,8 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxError<T> extends Flux<T> implements Fuseable.ScalarCallable {
final class FluxError<T> extends Flux<T>
implements Fuseable.ScalarCallable, Scannable {

final Throwable error;

Expand All @@ -50,4 +52,9 @@ public Object call() throws Exception {
}
throw Exceptions.propagate(error);
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

/**
Expand All @@ -30,7 +31,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxErrorOnRequest<T> extends Flux<T> {
final class FluxErrorOnRequest<T> extends Flux<T> implements Scannable {

final Throwable error;

Expand All @@ -43,6 +44,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(new ErrorSubscription(actual, error));
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

static final class ErrorSubscription implements InnerProducer {

final CoreSubscriber<?> actual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;

/**
* Emits a generated {@link Throwable} instance to Subscribers, lazily generated via a
Expand All @@ -47,7 +48,8 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxErrorSupplied<T> extends Flux<T> implements Fuseable.ScalarCallable {
final class FluxErrorSupplied<T> extends Flux<T>
implements Fuseable.ScalarCallable, Scannable {

final Supplier<Throwable> errorSupplier;

Expand All @@ -69,4 +71,9 @@ public Object call() throws Exception {
}
throw Exceptions.propagate(error);
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxFirstEmitting<T> extends Flux<T> {
final class FluxFirstEmitting<T> extends Flux<T> implements Scannable {

final Publisher<? extends T>[] array;

Expand Down Expand Up @@ -161,6 +161,11 @@ FluxFirstEmitting<T> ambAdditionalSource(Publisher<? extends T> source) {
return null;
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

static final class RaceCoordinator<T>
implements Subscription, Scannable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand All @@ -38,8 +39,7 @@
* @param <S> the custom state per subscriber
* @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a>
*/
final class FluxGenerate<T, S>
extends Flux<T> implements Fuseable {
final class FluxGenerate<T, S> extends Flux<T> implements Fuseable, Scannable {


static final Callable EMPTY_CALLABLE = () -> null;
Expand Down Expand Up @@ -83,6 +83,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(new GenerateSubscription<>(actual, state, generator, stateConsumer));
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

static final class GenerateSubscription<T, S>
implements QueueSubscription<T>, InnerProducer<T>, SynchronousSink<T> {

Expand Down
Loading

0 comments on commit 9dac17d

Please sign in to comment.