diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index 2c8d6bb7f2..e3dc91b2f1 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -15,6 +15,8 @@ import java.util.*; import java.util.concurrent.*; +import org.reactivestreams.Subscription; + import io.reactivex.*; import io.reactivex.functions.*; import io.reactivex.plugins.RxJavaPlugins; @@ -662,4 +664,11 @@ public boolean test(T t1, T t2) throws Exception { public static BiPredicate equalsPredicate(Function keySelector) { return new KeyedEqualsPredicate(keySelector); } + + public static final Consumer REQUEST_MAX = new Consumer() { + @Override + public void accept(Subscription t) throws Exception { + t.request(Long.MAX_VALUE); + } + }; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java index c66cf1ad41..88089631aa 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java @@ -17,13 +17,10 @@ import org.reactivestreams.*; -import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.DefaultSubscriber; /** * Utility methods to consume a Publisher in a blocking manner with callbacks or Subscriber. @@ -65,17 +62,14 @@ public static void subscribe(Publisher o, Subscriber if (bs.isCancelled()) { break; } - if (o == BlockingSubscriber.TERMINATED) { - break; - } - if (NotificationLite.acceptFull(v, subscriber)) { + if (o == BlockingSubscriber.TERMINATED + || NotificationLite.acceptFull(v, subscriber)) { break; } } } catch (InterruptedException e) { - subscriber.onError(e); - } finally { bs.cancel(); + subscriber.onError(e); } } @@ -85,31 +79,14 @@ public static void subscribe(Publisher o, Subscriber * @param the value type */ public static void subscribe(Publisher o) { - final CountDownLatch cdl = new CountDownLatch(1); - final Throwable[] error = { null }; + BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver(); LambdaSubscriber ls = new LambdaSubscriber(Functions.emptyConsumer(), - new Consumer() { - @Override - public void accept(Throwable e) { - error[0] = e; - cdl.countDown(); - } - }, new Action() { - @Override - public void run() { - cdl.countDown(); - } - }, new Consumer() { - @Override - public void accept(Subscription s) { - s.request(Long.MAX_VALUE); - } - }); + callback, callback, Functions.REQUEST_MAX); o.subscribe(ls); - BlockingHelper.awaitForComplete(cdl, ls); - Throwable e = error[0]; + BlockingHelper.awaitForComplete(callback, ls); + Throwable e = callback.error; if (e != null) { throw ExceptionHelper.wrapOrThrow(e); } @@ -125,50 +102,6 @@ public void accept(Subscription s) { */ public static void subscribe(Publisher o, final Consumer onNext, final Consumer onError, final Action onComplete) { - subscribe(o, new DefaultSubscriber() { - boolean done; - @Override - public void onNext(T t) { - if (done) { - return; - } - try { - onNext.accept(t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancel(); - onError(ex); - } - } - - @Override - public void onError(Throwable e) { - if (done) { - RxJavaPlugins.onError(e); - return; - } - done = true; - try { - onError.accept(e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } - } - - @Override - public void onComplete() { - if (done) { - return; - } - done = true; - try { - onComplete.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } - } - }); + subscribe(o, new LambdaSubscriber(onNext, onError, onComplete, Functions.REQUEST_MAX)); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java index 296582797c..ee739c82a0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java @@ -170,10 +170,10 @@ public void request(long n) { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); if (get() == 0 && compareAndSet(0, 1)) { + boolean d = active.decrementAndGet() == 0; if (requested.get() != 0) { actual.onNext(value); - boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java index 1d41354603..bb99bdc712 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java @@ -170,10 +170,10 @@ public void request(long n) { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); if (get() == 0 && compareAndSet(0, 1)) { + boolean d = active.decrementAndGet() == 0; if (requested.get() != 0) { actual.onNext(value); - boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java index d0f40753df..dcc0c4da4a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java @@ -13,13 +13,14 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; -import io.reactivex.*; +import io.reactivex.Notification; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class FlowableMaterialize extends AbstractFlowableWithUpstream> { @@ -32,24 +33,20 @@ protected void subscribeActual(Subscriber> s) { source.subscribe(new MaterializeSubscriber(s)); } - // FIXME needs post-complete drain management static final class MaterializeSubscriber extends AtomicLong implements Subscriber, Subscription { private static final long serialVersionUID = -3740826063558713822L; + final Subscriber> actual; Subscription s; - final AtomicInteger state = new AtomicInteger(); - Notification value; - volatile boolean done; + long produced; - static final int NO_REQUEST_NO_VALUE = 0; - static final int NO_REQUEST_HAS_VALUE = 1; - static final int HAS_REQUEST_NO_VALUE = 2; - static final int HAS_REQUEST_HAS_VALUE = 3; + static final long COMPLETE_MASK = Long.MIN_VALUE; + static final long REQUEST_MASK = Long.MAX_VALUE; MaterializeSubscriber(Subscriber> actual) { this.actual = actual; @@ -65,92 +62,70 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + produced++; actual.onNext(Notification.createOnNext(t)); - - if (get() != Long.MAX_VALUE) { - decrementAndGet(); - } - } - - void tryEmit(Notification v) { - if (get() != 0L) { - state.lazySet(HAS_REQUEST_HAS_VALUE); - actual.onNext(v); - actual.onComplete(); - } else { - for (;;) { - int s = state.get(); - if (s == HAS_REQUEST_NO_VALUE) { - if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) { - actual.onNext(v); - actual.onComplete(); - return; - } - } else - if (s == NO_REQUEST_HAS_VALUE) { - return; - } else - if (s == HAS_REQUEST_HAS_VALUE) { - value = null; - return; - } else { - value = v; - done = true; - if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) { - return; - } - } - } - } } @Override public void onError(Throwable t) { - Notification v = Notification.createOnError(t); - - tryEmit(v); + complete(Notification.createOnError(t)); } @Override public void onComplete() { - Notification v = Notification.createOnComplete(); + complete(Notification.createOnComplete()); + } - tryEmit(v); + void complete(Notification n) { + long p = produced; + if (p != 0) { + BackpressureHelper.produced(this, p); + } + + for (;;) { + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + if (n.isOnError()) { + RxJavaPlugins.onError(n.getError()); + } + return; + } + if ((r & REQUEST_MASK) != 0) { + lazySet(COMPLETE_MASK + 1); + actual.onNext(n); + actual.onComplete(); + return; + } + value = n; + if (compareAndSet(0, COMPLETE_MASK)) { + return; + } + } } @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - BackpressureHelper.add(this, n); - if (done) { + if (SubscriptionHelper.validate(n)) { for (;;) { - int s = state.get(); - if (s == NO_REQUEST_HAS_VALUE) { - if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) { - Notification v = value; - value = null; - actual.onNext(v); + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) { + actual.onNext(value); actual.onComplete(); - return; } - } else - if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) { - return; - } else - if (state.compareAndSet(s, HAS_REQUEST_NO_VALUE)) { - return; + break; + } + long u = BackpressureHelper.addCap(r, n); + if (compareAndSet(r, u)) { + s.request(n); + break; } } - } else { - s.request(n); } } @Override public void cancel() { - state.lazySet(HAS_REQUEST_HAS_VALUE); s.cancel(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java index dc7cb90b6d..72ca07c548 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java @@ -13,12 +13,13 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -34,25 +35,22 @@ protected void subscribeActual(Subscriber s) { source.subscribe(new OnErrorReturnSubscriber(s, valueSupplier)); } - // FIXME requires post-complete drain management - static final class OnErrorReturnSubscriber extends AtomicLong implements Subscriber, Subscription { + static final class OnErrorReturnSubscriber extends AtomicLong + implements Subscriber, Subscription { private static final long serialVersionUID = -3740826063558713822L; final Subscriber actual; + final Function valueSupplier; Subscription s; - final AtomicInteger state = new AtomicInteger(); - T value; - volatile boolean done; + long produced; - static final int NO_REQUEST_NO_VALUE = 0; - static final int NO_REQUEST_HAS_VALUE = 1; - static final int HAS_REQUEST_NO_VALUE = 2; - static final int HAS_REQUEST_HAS_VALUE = 3; + static final long COMPLETE_MASK = Long.MIN_VALUE; + static final long REQUEST_MASK = Long.MAX_VALUE; OnErrorReturnSubscriber(Subscriber actual, Function valueSupplier) { this.actual = actual; @@ -69,103 +67,75 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + produced++; actual.onNext(t); - - if (get() != Long.MAX_VALUE) { - decrementAndGet(); - } } @Override public void onError(Throwable t) { - done = true; T v; try { - v = valueSupplier.apply(t); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - state.lazySet(HAS_REQUEST_HAS_VALUE); - actual.onError(new CompositeException(t, e)); - return; - } - - if (v == null) { - state.lazySet(HAS_REQUEST_HAS_VALUE); - NullPointerException npe = new NullPointerException("The supplied value is null"); - npe.initCause(t); - actual.onError(npe); + v = ObjectHelper.requireNonNull(valueSupplier.apply(t), "The valueSupplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(new CompositeException(t, ex)); return; } - - if (get() != 0L) { - state.lazySet(HAS_REQUEST_HAS_VALUE); - actual.onNext(v); - actual.onComplete(); - } else { - for (;;) { - int s = state.get(); - if (s == HAS_REQUEST_NO_VALUE) { - if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) { - actual.onNext(v); - actual.onComplete(); - return; - } - } else - if (s == NO_REQUEST_HAS_VALUE) { - return; - } else - if (s == HAS_REQUEST_HAS_VALUE) { - value = null; - return; - } else { - value = v; - if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) { - return; - } - } - } - } + complete(v); } @Override public void onComplete() { - state.lazySet(HAS_REQUEST_HAS_VALUE); actual.onComplete(); } + void complete(T n) { + long p = produced; + if (p != 0L) { + BackpressureHelper.produced(this, p); + } + + for (;;) { + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + return; + } + if ((r & REQUEST_MASK) != 0) { + lazySet(COMPLETE_MASK + 1); + actual.onNext(n); + actual.onComplete(); + return; + } + value = n; + if (compareAndSet(0, COMPLETE_MASK)) { + return; + } + } + } + @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - BackpressureHelper.add(this, n); - if (done) { + if (SubscriptionHelper.validate(n)) { for (;;) { - int s = state.get(); - if (s == NO_REQUEST_HAS_VALUE) { - if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) { - T v = value; - value = null; - actual.onNext(v); + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) { + actual.onNext(value); actual.onComplete(); - return; } - } else - if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) { - return; - } else - if (state.compareAndSet(s, HAS_REQUEST_NO_VALUE)) { - return; + break; + } + long u = BackpressureHelper.addCap(r, n); + if (compareAndSet(r, u)) { + s.request(n); + break; } } - } else { - s.request(n); } } @Override public void cancel() { - state.lazySet(HAS_REQUEST_HAS_VALUE); s.cancel(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index dbfb65eabd..5c5bf9e3a8 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -19,10 +19,10 @@ import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Consumer; -import io.reactivex.internal.fuseable.HasUpstreamPublisher; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -34,6 +34,12 @@ * @param the value type */ public final class FlowablePublish extends ConnectableFlowable implements HasUpstreamPublisher { + /** + * Indicates this child has been cancelled: the state is swapped in atomically and + * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. + */ + static final long CANCELLED = Long.MIN_VALUE; + /** The source observable. */ final Publisher source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -57,64 +63,66 @@ public static ConnectableFlowable create(Flowable source, final int bu Publisher onSubscribe = new Publisher() { @Override public void subscribe(Subscriber child) { - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - PublishSubscriber r = curr.get(); - // if there isn't one or it is cancelled/disposed - if (r == null || r.isDisposed()) { - // create a new subscriber to source - PublishSubscriber u = new PublishSubscriber(curr, bufferSize); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; + // create the backpressure-managing producer for this child + InnerSubscriber inner = new InnerSubscriber(child); + child.onSubscribe(inner); + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + PublishSubscriber r = curr.get(); + // if there isn't one or it is cancelled/disposed + if (r == null || r.isDisposed()) { + // create a new subscriber to source + PublishSubscriber u = new PublishSubscriber(curr, bufferSize); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; } - // we won, let's use it going onwards - r = u; - } - // create the backpressure-managing producer for this child - InnerSubscriber inner = new InnerSubscriber(r, child); - /* - * Try adding it to the current subscriber-to-source, add is atomic in respect - * to other adds and the termination of the subscriber-to-source. - */ - if (r.add(inner)) { - // the producer has been registered with the current subscriber-to-source so - // at least it will receive the next terminal event - // setting the producer will trigger the first request to be considered by - // the subscriber-to-source. - child.onSubscribe(inner); - break; // NOPMD + /* + * Try adding it to the current subscriber-to-source, add is atomic in respect + * to other adds and the termination of the subscriber-to-source. + */ + if (r.add(inner)) { + if (inner.get() == CANCELLED) { + r.remove(inner); + } else { + inner.parent = r; + } + r.dispatch(); + break; // NOPMD + } + /* + * The current PublishSubscriber has been terminated, try with a newer one. + */ + /* + * Note: although technically correct, concurrent disconnects can cause + * unexpected behavior such as child subscribers never receiving anything + * (unless connected again). An alternative approach, similar to + * PublishProcessor would be to immediately terminate such child + * subscribers as well: + * + * Object term = r.terminalEvent; + * if (r.nl.isCompleted(term)) { + * child.onComplete(); + * } else { + * child.onError(r.nl.getError(term)); + * } + * return; + * + * The original concurrent behavior was non-deterministic in this regard as well. + * Allowing this behavior, however, may introduce another unexpected behavior: + * after disconnecting a previous connection, one might not be able to prepare + * a new connection right after a previous termination by subscribing new child + * subscribers asynchronously before a connect call. + */ } - /* - * The current PublishSubscriber has been terminated, try with a newer one. - */ - /* - * Note: although technically correct, concurrent disconnects can cause - * unexpected behavior such as child subscribers never receiving anything - * (unless connected again). An alternative approach, similar to - * PublishProcessor would be to immediately terminate such child - * subscribers as well: - * - * Object term = r.terminalEvent; - * if (r.nl.isCompleted(term)) { - * child.onComplete(); - * } else { - * child.onError(r.nl.getError(term)); - * } - * return; - * - * The original concurrent behavior was non-deterministic in this regard as well. - * Allowing this behavior, however, may introduce another unexpected behavior: - * after disconnecting a previous connection, one might not be able to prepare - * a new connection right after a previous termination by subscribing new child - * subscribers asynchronously before a connect call. - */ - } } }; return RxJavaPlugins.onAssembly(new FlowablePublish(onSubscribe, source, curr, bufferSize)); @@ -188,21 +196,21 @@ public void connect(Consumer connection) { } @SuppressWarnings("rawtypes") - static final class PublishSubscriber implements Subscriber, Disposable { - /** Holds notifications from upstream. */ - final SpscArrayQueue queue; - /** Holds onto the current connected PublishSubscriber. */ - final AtomicReference> current; - /** The prefetch buffer size. */ - final int bufferSize; - /** Contains either an onComplete or an onError token from upstream. */ - volatile Object terminalEvent; + static final class PublishSubscriber + extends AtomicInteger + implements Subscriber, Disposable { + private static final long serialVersionUID = -202316842419149694L; /** Indicates an empty array of inner subscribers. */ static final InnerSubscriber[] EMPTY = new InnerSubscriber[0]; /** Indicates a terminated PublishSubscriber. */ static final InnerSubscriber[] TERMINATED = new InnerSubscriber[0]; + /** Holds onto the current connected PublishSubscriber. */ + final AtomicReference> current; + /** The prefetch buffer size. */ + final int bufferSize; + /** Tracks the subscribed InnerSubscribers. */ final AtomicReference subscribers; /** @@ -211,16 +219,17 @@ static final class PublishSubscriber implements Subscriber, Disposable { */ final AtomicBoolean shouldConnect; - /** Guarded by this. */ - boolean emitting; - /** Guarded by this. */ - boolean missed; - final AtomicReference s = new AtomicReference(); - PublishSubscriber(AtomicReference> current, int bufferSize) { - this.queue = new SpscArrayQueue(bufferSize); + /** Contains either an onComplete or an onError token from upstream. */ + volatile Object terminalEvent; + + int sourceMode; + + /** Holds notifications from upstream. */ + volatile SimpleQueue queue; + PublishSubscriber(AtomicReference> current, int bufferSize) { this.subscribers = new AtomicReference(EMPTY); this.current = current; this.shouldConnect = new AtomicBoolean(); @@ -246,6 +255,28 @@ public boolean isDisposed() { @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.s, s)) { + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription) s; + + int m = qs.requestFusion(QueueSubscription.ANY); + if (m == QueueSubscription.SYNC) { + sourceMode = m; + queue = qs; + terminalEvent = NotificationLite.complete(); + dispatch(); + return; + } + if (m == QueueSubscription.ASYNC) { + sourceMode = m; + queue = qs; + s.request(bufferSize); + return; + } + } + + queue = new SpscArrayQueue(bufferSize); + s.request(bufferSize); } } @@ -253,14 +284,13 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { // we expect upstream to honor backpressure requests - // nl is required because JCTools queue doesn't accept nulls. - if (!queue.offer(t)) { - onError(new IllegalStateException("Prefetch queue is full?!")); - } else { - // since many things can happen concurrently, we have a common dispatch - // loop to act on the current state serially - dispatch(); + if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) { + onError(new MissingBackpressureException("Prefetch queue is full?!")); + return; } + // since many things can happen concurrently, we have a common dispatch + // loop to act on the current state serially + dispatch(); } @Override public void onError(Throwable e) { @@ -294,9 +324,6 @@ public void onComplete() { * @return true if succeeded, false otherwise */ boolean add(InnerSubscriber producer) { - if (producer == null) { - throw new NullPointerException(); - } // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array @@ -329,14 +356,14 @@ void remove(InnerSubscriber producer) { for (;;) { // let's read the current subscribers array InnerSubscriber[] c = subscribers.get(); + int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { - return; + if (len == 0) { + break; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child subscribers in general int j = -1; - int len = c.length; for (int i = 0; i < len; i++) { if (c[i].equals(producer)) { j = i; @@ -363,7 +390,7 @@ void remove(InnerSubscriber producer) { } // try setting this new array as if (subscribers.compareAndSet(c, u)) { - return; + break; } // if we failed, it means something else happened // (a concurrent add/remove or termination), we need to retry @@ -387,26 +414,20 @@ boolean checkTerminated(Object term, boolean empty) { // this will prevent OnSubscribe spinning on a terminated but // not yet cancelled PublishSubscriber current.compareAndSet(this, null); - try { - /* - * This will swap in a terminated array so add() in OnSubscribe will reject - * child subscribers to associate themselves with a terminated and thus - * never again emitting chain. - * - * Since we atomically change the contents of 'subscribers' only one - * operation wins at a time. If an add() wins before this getAndSet, - * its value will be part of the returned array by getAndSet and thus - * will receive the terminal notification. Otherwise, if getAndSet wins, - * add() will refuse to add the child producer and will trigger the - * creation of subscriber-to-source. - */ - for (InnerSubscriber ip : subscribers.getAndSet(TERMINATED)) { - ip.child.onComplete(); - } - } finally { - // we explicitly dispose/disconnect from the upstream - // after we sent out the terminal event to child subscribers - dispose(); + /* + * This will swap in a terminated array so add() in OnSubscribe will reject + * child subscribers to associate themselves with a terminated and thus + * never again emitting chain. + * + * Since we atomically change the contents of 'subscribers' only one + * operation wins at a time. If an add() wins before this getAndSet, + * its value will be part of the returned array by getAndSet and thus + * will receive the terminal notification. Otherwise, if getAndSet wins, + * add() will refuse to add the child producer and will trigger the + * creation of subscriber-to-source. + */ + for (InnerSubscriber ip : subscribers.getAndSet(TERMINATED)) { + ip.child.onComplete(); } // indicate we reached the terminal state return true; @@ -416,22 +437,16 @@ boolean checkTerminated(Object term, boolean empty) { // this will prevent OnSubscribe spinning on a terminated // but not yet cancelled PublishSubscriber current.compareAndSet(this, null); - try { - // this will swap in a terminated array so add() in OnSubscribe will reject - // child subscribers to associate themselves with a terminated and thus - // never again emitting chain - InnerSubscriber[] a = subscribers.getAndSet(TERMINATED); - if (a.length != 0) { - for (InnerSubscriber ip : a) { - ip.child.onError(t); - } - } else { - RxJavaPlugins.onError(t); + // this will swap in a terminated array so add() in OnSubscribe will reject + // child subscribers to associate themselves with a terminated and thus + // never again emitting chain + InnerSubscriber[] a = subscribers.getAndSet(TERMINATED); + if (a.length != 0) { + for (InnerSubscriber ip : a) { + ip.child.onError(t); } - } finally { - // we explicitly dispose/disconnect from the upstream - // after we sent out the terminal event to child subscribers - dispose(); + } else { + RxJavaPlugins.onError(t); } // indicate we reached the terminal state return true; @@ -446,173 +461,161 @@ boolean checkTerminated(Object term, boolean empty) { * requesting more. */ void dispatch() { - // standard construct of emitter loop (blocking) + // standard construct of queue-drain // if there is an emission going on, indicate that more work needs to be done // the exact nature of this work needs to be determined from other data structures - synchronized (this) { - if (emitting) { - missed = true; + if (getAndIncrement() != 0) { + return; + } + int missed = 1; + for (;;) { + /* + * We need to read terminalEvent before checking the queue for emptiness because + * all enqueue happens before setting the terminal event. + * If it were the other way around, when the emission is paused between + * checking isEmpty and checking terminalEvent, some other thread might + * have produced elements and set the terminalEvent and we'd quit emitting + * prematurely. + */ + Object term = terminalEvent; + /* + * See if the queue is empty; since we need this information multiple + * times later on, we read it one. + * Although the queue can become non-empty in the mean time, we will + * detect it through the missing flag and will do another iteration. + */ + SimpleQueue q = queue; + + boolean empty = q == null || q.isEmpty(); + // if the queue is empty and the terminal event was received, quit + // and don't bother restoring emitting to false: no further activity is + // possible at this point + if (checkTerminated(term, empty)) { return; } - // there was no emission going on, we won and will start emitting - emitting = true; - missed = false; - } - /* - * In case an exception is thrown in the loop, we need to set emitting back to false - * on the way out (the exception will propagate up) so if it bounces back and - * onError is called, its dispatch() call will have the opportunity to emit it. - * However, if we want to exit regularly, we will set the emitting to false (+ other operations) - * atomically so we want to prevent the finally part to accidentally unlock some other - * emissions happening between the two synchronized blocks. - */ - boolean skipFinal = false; - try { - for (;;) { - /* - * We need to read terminalEvent before checking the queue for emptiness because - * all enqueue happens before setting the terminal event. - * If it were the other way around, when the emission is paused between - * checking isEmpty and checking terminalEvent, some other thread might - * have produced elements and set the terminalEvent and we'd quit emitting - * prematurely. - */ - Object term = terminalEvent; - /* - * See if the queue is empty; since we need this information multiple - * times later on, we read it one. - * Although the queue can become non-empty in the mean time, we will - * detect it through the missing flag and will do another iteration. - */ - boolean empty = queue.isEmpty(); - // if the queue is empty and the terminal event was received, quit - // and don't bother restoring emitting to false: no further activity is - // possible at this point - if (checkTerminated(term, empty)) { - skipFinal = true; - return; - } - // We have elements queued. Note that due to the serialization nature of dispatch() - // this loop is the only one which can turn a non-empty queue into an empty one - // and as such, no need to ask the queue itself again for that. - if (!empty) { - // We take a snapshot of the current child subscribers. - // Concurrent subscribers may miss this iteration, but it is to be expected - @SuppressWarnings("unchecked") - InnerSubscriber[] ps = subscribers.get(); - - int len = ps.length; - // Let's assume everyone requested the maximum value. - long maxRequested = Long.MAX_VALUE; - // count how many have triggered cancellation - int cancelled = 0; - - // Now find the minimum amount each child-subscriber requested - // since we can only emit that much to all of them without violating - // backpressure constraints - for (InnerSubscriber ip : ps) { - long r = ip.get(); - // if there is one child subscriber that hasn't requested yet - // we can't emit anything to anyone - if (r >= 0L) { - maxRequested = Math.min(maxRequested, r); - } else - // cancellation is indicated by a special value - if (r == InnerSubscriber.CANCELLED) { - cancelled++; - } - // we ignore those with NOT_REQUESTED as if they aren't even there + // We have elements queued. Note that due to the serialization nature of dispatch() + // this loop is the only one which can turn a non-empty queue into an empty one + // and as such, no need to ask the queue itself again for that. + if (!empty) { + // We take a snapshot of the current child subscribers. + // Concurrent subscribers may miss this iteration, but it is to be expected + @SuppressWarnings("unchecked") + InnerSubscriber[] ps = subscribers.get(); + + int len = ps.length; + // Let's assume everyone requested the maximum value. + long maxRequested = Long.MAX_VALUE; + // count how many have triggered cancellation + int cancelled = 0; + + // Now find the minimum amount each child-subscriber requested + // since we can only emit that much to all of them without violating + // backpressure constraints + for (InnerSubscriber ip : ps) { + long r = ip.get(); + // if there is one child subscriber that hasn't requested yet + // we can't emit anything to anyone + if (r >= 0L) { + maxRequested = Math.min(maxRequested, r); + } else + // cancellation is indicated by a special value + if (r == CANCELLED) { + cancelled++; } + // we ignore those with NOT_REQUESTED as if they aren't even there + } - // it may happen everyone has cancelled between here and subscribers.get() - // or we have no subscribers at all to begin with - if (len == cancelled) { - term = terminalEvent; - // so let's consume a value from the queue - Object v = queue.poll(); - // or terminate if there was a terminal event and the queue is empty - if (checkTerminated(term, v == null)) { - skipFinal = true; - return; - } - // otherwise, just ask for a new value + // it may happen everyone has cancelled between here and subscribers.get() + // or we have no subscribers at all to begin with + if (len == cancelled) { + term = terminalEvent; + // so let's consume a value from the queue + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.get().cancel(); + term = NotificationLite.error(ex); + terminalEvent = term; + v = null; + } + // or terminate if there was a terminal event and the queue is empty + if (checkTerminated(term, v == null)) { + return; + } + // otherwise, just ask for a new value + if (sourceMode != QueueSubscription.SYNC) { s.get().request(1); - // and retry emitting to potential new child subscribers - continue; } - // if we get here, it means there are non-cancelled child subscribers - // and we count the number of emitted values because the queue - // may contain less than requested - int d = 0; - while (d < maxRequested) { - term = terminalEvent; - Object v = queue.poll(); - empty = v == null; - // let's check if there is a terminal event and the queue became empty just now - if (checkTerminated(term, empty)) { - skipFinal = true; - return; - } - // the queue is empty but we aren't terminated yet, finish this emission loop - if (empty) { - break; - } - // we need to unwrap potential nulls - T value = NotificationLite.getValue(v); - // let's emit this value to all child subscribers - for (InnerSubscriber ip : ps) { - // if ip.get() is negative, the child has either cancelled in the - // meantime or hasn't requested anything yet - // this eager behavior will skip cancelled children in case - // multiple values are available in the queue - if (ip.get() > 0L) { - ip.child.onNext(value); - // indicate this child has received 1 element - ip.produced(1); - } - } - // indicate we emitted one element - d++; + // and retry emitting to potential new child subscribers + continue; + } + // if we get here, it means there are non-cancelled child subscribers + // and we count the number of emitted values because the queue + // may contain less than requested + int d = 0; + while (d < maxRequested) { + term = terminalEvent; + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.get().cancel(); + term = NotificationLite.error(ex); + terminalEvent = term; + v = null; } - // if we did emit at least one element, request more to replenish the queue - if (d > 0) { - s.get().request(d); + empty = v == null; + // let's check if there is a terminal event and the queue became empty just now + if (checkTerminated(term, empty)) { + return; } - // if we have requests but not an empty queue after emission - // let's try again to see if more requests/child subscribers are - // ready to receive more - if (maxRequested != 0L && !empty) { - continue; + // the queue is empty but we aren't terminated yet, finish this emission loop + if (empty) { + break; } + // we need to unwrap potential nulls + T value = NotificationLite.getValue(v); + // let's emit this value to all child subscribers + for (InnerSubscriber ip : ps) { + // if ip.get() is negative, the child has either cancelled in the + // meantime or hasn't requested anything yet + // this eager behavior will skip cancelled children in case + // multiple values are available in the queue + if (ip.get() > 0L) { + ip.child.onNext(value); + // indicate this child has received 1 element + ip.produced(1); + } + } + // indicate we emitted one element + d++; } - // we did what we could: either the queue is empty or child subscribers - // haven't requested more (or both), let's try to finish dispatching - synchronized (this) { - // since missed is changed atomically, if we see it as true - // it means some state has changed and we need to loop again - // and handle that case - if (!missed) { - // but if no missed dispatch happened, let's stop emitting - emitting = false; - // and skip the emitting = false in the finally block as well - skipFinal = true; - return; + // if we did emit at least one element, request more to replenish the queue + if (d > 0) { + if (sourceMode != QueueSubscription.SYNC) { + s.get().request(d); } - // we acknowledge the missed changes so far - missed = false; } - } - } finally { - // unless returned cleanly (i.e., some method above threw) - if (!skipFinal) { - // we stop emitting so the error can propagate back down through onError - synchronized (this) { - emitting = false; + // if we have requests but not an empty queue after emission + // let's try again to see if more requests/child subscribers are + // ready to receive more + if (maxRequested != 0L && !empty) { + continue; } } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } } } } @@ -621,70 +624,29 @@ void dispatch() { * child subscriber in thread-safe manner. * @param the value type */ - static final class InnerSubscriber extends AtomicLong implements Subscription, Disposable { + static final class InnerSubscriber extends AtomicLong implements Subscription { private static final long serialVersionUID = -4453897557930727610L; - /** - * The parent subscriber-to-source used to allow removing the child in case of - * child cancellation. - */ - final PublishSubscriber parent; /** The actual child subscriber. */ final Subscriber child; /** - * Indicates this child has been cancelled: the state is swapped in atomically and - * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. + * The parent subscriber-to-source used to allow removing the child in case of + * child cancellation. */ - static final long CANCELLED = Long.MIN_VALUE; + volatile PublishSubscriber parent; - InnerSubscriber(PublishSubscriber parent, Subscriber child) { - this.parent = parent; + InnerSubscriber(Subscriber child) { this.child = child; } @Override public void request(long n) { - if (n < 0) { - RxJavaPlugins.onError(new IllegalArgumentException("n < 0 required but it was " + n)); - return; - } - // In general, RxJava doesn't prevent concurrent requests (with each other or with - // a cancel) so we need a CAS-loop, but we need to handle - // request overflow and cancelled/not requested state as well. - for (;;) { - // get the current request amount - long r = get(); - // if child called cancel() do nothing - if (r == CANCELLED) { - return; - } - // ignore zero requests except any first that sets in zero - if (r >= 0L && n == 0) { - return; - } - long u; - // if this child has not requested yet - if (r == 0L) { - // let the new request value this (no overflow check needed) - u = n; - } else { - // otherwise, increase the request count - u = r + n; - // and check for long overflow - if (u < 0) { - // cap at max value, which is essentially unlimited - u = Long.MAX_VALUE; - } - } - // try setting the new request value - if (compareAndSet(r, u)) { - // if successful, notify the parent dispatcher this child can receive more - // elements - parent.dispatch(); - return; + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.addCancel(this, n); + PublishSubscriber p = parent; + if (p != null) { + p.dispatch(); } - // otherwise, someone else changed the state (perhaps a concurrent - // request or cancellation so retry } } @@ -694,48 +656,11 @@ public void request(long n) { * @return the updated request value (may indicate how much can be produced or a terminal state) */ public long produced(long n) { - // we don't allow producing zero or less: it would be a bug in the operator - if (n <= 0) { - throw new IllegalArgumentException("Cant produce zero or less"); - } - for (;;) { - // get the current request value - long r = get(); - // if no request has been made yet, we shouldn't have emitted to this child - // subscriber so there is a bug in this operator - if (r == 0L) { - throw new IllegalStateException("Produced without request"); - } - // if the child has cancelled, simply return and indicate this - if (r == CANCELLED) { - return CANCELLED; - } - // reduce the requested amount - long u = r - n; - // if the new amount is less than zero, we have a bug in this operator - if (u < 0) { - throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); - } - // try updating the request value - if (compareAndSet(r, u)) { - // and return the updated value - return u; - } - // otherwise, some concurrent activity happened and we need to retry - } - } - - @Override - public boolean isDisposed() { - return get() == CANCELLED; + return BackpressureHelper.producedCancel(this, n); } @Override public void cancel() { - dispose(); - } - @Override - public void dispose() { long r = get(); // let's see if we are cancelled if (r != CANCELLED) { @@ -746,13 +671,16 @@ public void dispose() { r = getAndSet(CANCELLED); // and only one of them will see a non-terminated value before the swap if (r != CANCELLED) { - // remove this from the parent - parent.remove(this); - // After removal, we might have unblocked the other child subscribers: - // let's assume this child had 0 requested before the cancellation while - // the others had non-zero. By removing this 'blocking' child, the others - // are now free to receive events - parent.dispatch(); + PublishSubscriber p = parent; + if (p != null) { + // remove this from the parent + p.remove(this); + // After removal, we might have unblocked the other child subscribers: + // let's assume this child had 0 requested before the cancellation while + // the others had non-zero. By removing this 'blocking' child, the others + // are now free to receive events + p.dispatch(); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java index c1942b7f9c..8e63ecf642 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java @@ -32,20 +32,14 @@ public FlowableSubscribeOn(Publisher source, Scheduler scheduler) { @Override public void subscribeActual(final Subscriber s) { Scheduler.Worker w = scheduler.createWorker(); - final SubscribeOnSubscriber sos = new SubscribeOnSubscriber(s, w); + final SubscribeOnSubscriber sos = new SubscribeOnSubscriber(s, w, source); s.onSubscribe(sos); - w.schedule(new Runnable() { - @Override - public void run() { - sos.lazySet(Thread.currentThread()); - source.subscribe(sos); - } - }); + w.schedule(sos); } static final class SubscribeOnSubscriber extends AtomicReference - implements Subscriber, Subscription { + implements Subscriber, Subscription, Runnable { private static final long serialVersionUID = 8094547886072529208L; final Subscriber actual; @@ -55,13 +49,24 @@ static final class SubscribeOnSubscriber extends AtomicReference final AtomicLong requested; - SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker) { + Publisher source; + + SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker, Publisher source) { this.actual = actual; this.worker = worker; + this.source = source; this.s = new AtomicReference(); this.requested = new AtomicLong(); } + @Override + public void run() { + lazySet(Thread.currentThread()); + Publisher src = source; + source = null; + src.subscribe(this); + } + @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.s, s)) { @@ -79,40 +84,32 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - try { - actual.onError(t); - } finally { - worker.dispose(); - } + actual.onError(t); + worker.dispose(); } @Override public void onComplete() { - try { - actual.onComplete(); - } finally { - worker.dispose(); - } + actual.onComplete(); + worker.dispose(); } @Override public void request(final long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - Subscription s = this.s.get(); - if (s != null) { - requestUpstream(n, s); - } else { - BackpressureHelper.add(requested, n); - s = this.s.get(); + if (SubscriptionHelper.validate(n)) { + Subscription s = this.s.get(); if (s != null) { - long r = requested.getAndSet(0L); - if (r != 0L) { - requestUpstream(r, s); + requestUpstream(n, s); + } else { + BackpressureHelper.add(requested, n); + s = this.s.get(); + if (s != null) { + long r = requested.getAndSet(0L); + if (r != 0L) { + requestUpstream(r, s); + } } } - } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java index 443e63c28d..f34dadbda8 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java @@ -16,7 +16,6 @@ import java.util.concurrent.*; import io.reactivex.*; -import io.reactivex.disposables.Disposable; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.observers.*; @@ -75,30 +74,14 @@ public static void subscribe(ObservableSource o, Observer the value type */ public static void subscribe(ObservableSource o) { - final CountDownLatch cdl = new CountDownLatch(1); - final Throwable[] error = { null }; + BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver(); LambdaObserver ls = new LambdaObserver(Functions.emptyConsumer(), - new Consumer() { - @Override - public void accept(Throwable e) { - error[0] = e; - cdl.countDown(); - } - }, new Action() { - @Override - public void run() { - cdl.countDown(); - } - }, new Consumer() { - @Override - public void accept(Disposable s) { - } - }); + callback, callback, Functions.emptyConsumer()); o.subscribe(ls); - BlockingHelper.awaitForComplete(cdl, ls); - Throwable e = error[0]; + BlockingHelper.awaitForComplete(callback, ls); + Throwable e = callback.error; if (e != null) { throw ExceptionHelper.wrapOrThrow(e); } diff --git a/src/main/java/io/reactivex/internal/util/BlockingIgnoringReceiver.java b/src/main/java/io/reactivex/internal/util/BlockingIgnoringReceiver.java new file mode 100644 index 0000000000..73b0d99d36 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/BlockingIgnoringReceiver.java @@ -0,0 +1,42 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.concurrent.CountDownLatch; + +import io.reactivex.functions.*; + +/** + * Stores an incoming Throwable (if any) and counts itself down. + */ +public final class BlockingIgnoringReceiver +extends CountDownLatch +implements Consumer, Action { + public Throwable error; + + public BlockingIgnoringReceiver() { + super(1); + } + + @Override + public void accept(Throwable e) { + error = e; + countDown(); + } + + @Override + public void run() { + countDown(); + } +} diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 3b5e258896..16e52117c2 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -17,11 +17,12 @@ import java.util.*; import java.util.concurrent.*; +import static org.junit.Assert.*; import org.junit.*; import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.processors.*; @@ -1671,14 +1672,22 @@ public void onErrorReturnValueNull() { just1.onErrorReturnItem(null); } - @Test(expected = NullPointerException.class) + @Test public void onErrorReturnFunctionReturnsNull() { - Flowable.error(new TestException()).onErrorReturn(new Function() { - @Override - public Object apply(Throwable e) { - return null; - } - }).blockingSubscribe(); + try { + Flowable.error(new TestException()).onErrorReturn(new Function() { + @Override + public Object apply(Throwable e) { + return null; + } + }).blockingSubscribe(); + fail("Should have thrown"); + } catch (CompositeException ex) { + List errors = TestHelper.compositeList(ex); + + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, NullPointerException.class, "The valueSupplier returned a null value"); + } } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAllTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAllTest.java index cac67a3e3b..36295d835f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAllTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAllTest.java @@ -24,9 +24,10 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -378,9 +379,10 @@ public boolean test(String v) { } @Test - @Ignore("RS Subscription can't be checked for isCancelled") public void dispose() { - // TestHelper.checkDisposed(Flowable.just(1).all(Functions.alwaysTrue()).toFlowable()); + TestHelper.checkDisposed(Flowable.just(1).all(Functions.alwaysTrue()).toFlowable()); + + TestHelper.checkDisposed(Flowable.just(1).all(Functions.alwaysTrue())); } @Test @@ -444,4 +446,37 @@ public boolean test(Integer v) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.all(Functions.alwaysTrue()); + } + }, false, 1, 1, true); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.all(Functions.alwaysTrue()).toFlowable(); + } + }, false, 1, 1, true); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable o) throws Exception { + return o.all(Functions.alwaysTrue()).toFlowable(); + } + }); + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, Single>() { + @Override + public Single apply(Flowable o) throws Exception { + return o.all(Functions.alwaysTrue()); + } + }); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBlockingTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBlockingTest.java index 2e86cf34e5..0e8c2d80d7 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBlockingTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBlockingTest.java @@ -17,6 +17,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.reactivestreams.*; @@ -27,6 +28,7 @@ import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -268,9 +270,18 @@ public void subscribe(Subscriber s) { @Test public void interrupt() { - TestSubscriber to = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber(0L); + Thread.currentThread().interrupt(); - Flowable.never().blockingSubscribe(to); + + try { + Flowable.just(1) + .blockingSubscribe(ts); + + ts.assertFailure(InterruptedException.class); + } finally { + Thread.interrupted(); // clear interrupted status just in case + } } @Test(expected = NoSuchElementException.class) @@ -331,4 +342,45 @@ protected void subscribeActual(Subscriber observer) { to.assertEmpty(); } + + @Test + public void blockinsSubscribeCancelAsync() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = new TestSubscriber(); + + final PublishProcessor pp = PublishProcessor.create(); + + final Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + final Runnable r2 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + final AtomicInteger c = new AtomicInteger(2); + + Schedulers.computation().scheduleDirect(new Runnable() { + @Override + public void run() { + c.decrementAndGet(); + while (c.get() != 0 && !pp.hasSubscribers()) { } + + TestHelper.race(r1, r2); + } + }); + + c.decrementAndGet(); + while (c.get() != 0) { } + + pp + .blockingSubscribe(ts); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java index d1aa479412..4c17fdd454 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java @@ -264,7 +264,7 @@ public void middleError() { Flowable.fromArray(new String[]{"1","a","2"}).flatMapMaybe(new Function>() { @Override public MaybeSource apply(final String s) throws NumberFormatException { - //return Single.just(Integer.valueOf(s)); //This works + //return Maybe.just(Integer.valueOf(s)); //This works return Maybe.fromCallable(new Callable() { @Override public Integer call() throws NumberFormatException { @@ -524,4 +524,54 @@ public void innerSuccessCompletesAfterMain() { to .assertResult(2); } + + @Test + public void backpressure() { + TestSubscriber ts = Flowable.just(1) + .flatMapMaybe(Functions.justFunction(Maybe.just(2))) + .test(0L) + .assertEmpty(); + + ts.request(1); + ts.assertResult(2); + } + + @Test + public void error() { + Flowable.just(1) + .flatMapMaybe(Functions.justFunction(Maybe.error(new TestException()))) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void errorDelayed() { + Flowable.just(1) + .flatMapMaybe(Functions.justFunction(Maybe.error(new TestException())), true, 16) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void requestCancelRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber to = Flowable.just(1).concatWith(Flowable.never()) + .flatMapMaybe(Functions.justFunction(Maybe.just(2))).test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.request(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java index ea29c72651..1936ba3e14 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java @@ -440,4 +440,54 @@ public void innerSuccessCompletesAfterMain() { to .assertResult(2); } + + @Test + public void backpressure() { + TestSubscriber ts = Flowable.just(1) + .flatMapSingle(Functions.justFunction(Single.just(2))) + .test(0L) + .assertEmpty(); + + ts.request(1); + ts.assertResult(2); + } + + @Test + public void error() { + Flowable.just(1) + .flatMapSingle(Functions.justFunction(Single.error(new TestException()))) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void errorDelayed() { + Flowable.just(1) + .flatMapSingle(Functions.justFunction(Single.error(new TestException())), true, 16) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void requestCancelRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber to = Flowable.just(1).concatWith(Flowable.never()) + .flatMapSingle(Functions.justFunction(Single.just(2))).test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.request(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index e2ff2ffcb2..61b658d370 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -26,6 +26,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -889,4 +890,11 @@ public Integer apply(Integer w) throws Exception { TestHelper.assertError(errors, 1, TestException.class); } + @Test + public void scalarXMap() { + Flowable.fromCallable(Functions.justCallable(1)) + .flatMap(Functions.justFunction(Flowable.fromCallable(Functions.justCallable(2)))) + .test() + .assertResult(2); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMaterializeTest.java index 0665a978f8..28708410ab 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMaterializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMaterializeTest.java @@ -297,4 +297,19 @@ public Flowable> apply(Flowable o) throws Exception } }); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.materialize(); + } + }, false, null, null, Notification.createOnComplete()); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.just(1).materialize()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java index b9b5e8a39e..3c14184f4f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturnTest.java @@ -48,8 +48,7 @@ public String apply(Throwable e) { }); - @SuppressWarnings("unchecked") - DefaultSubscriber observer = mock(DefaultSubscriber.class); + Subscriber observer = TestHelper.mockSubscriber(); observable.subscribe(observer); try { @@ -59,9 +58,9 @@ public String apply(Throwable e) { } verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onComplete(); verify(observer, times(1)).onNext("one"); verify(observer, times(1)).onNext("failure"); + verify(observer, times(1)).onComplete(); assertNotNull(capturedException.get()); } @@ -269,4 +268,19 @@ public Flowable apply(Flowable f) throws Exception { }); } + @Test + public void doubleOnError() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException()); + s.onError(new TestException()); + } + } + .onErrorReturnItem(1) + .test() + .assertResult(1); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java index dcb8d8b0a5..e3c07d896e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java @@ -24,11 +24,12 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.HasUpstreamPublisher; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -396,8 +397,9 @@ public void subscribe(Subscriber t) { assertEquals(2, calls.get()); } + @Test - public void testObserveOn() { + public void syncFusedObserveOn() { ConnectableFlowable co = Flowable.range(0, 1000).publish(); Flowable obs = co.observeOn(Schedulers.computation()); for (int i = 0; i < 1000; i++) { @@ -412,10 +414,91 @@ public void testObserveOn() { Disposable s = co.connect(); for (TestSubscriber ts : tss) { - ts.awaitTerminalEvent(2, TimeUnit.SECONDS); - ts.assertTerminated(); - ts.assertNoErrors(); - assertEquals(1000, ts.valueCount()); + ts.awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + s.dispose(); + } + } + } + + @Test + public void syncFusedObserveOn2() { + ConnectableFlowable co = Flowable.range(0, 1000).publish(); + Flowable obs = co.observeOn(ImmediateThinScheduler.INSTANCE); + for (int i = 0; i < 1000; i++) { + for (int j = 1; j < 6; j++) { + List> tss = new ArrayList>(); + for (int k = 1; k < j; k++) { + TestSubscriber ts = new TestSubscriber(); + tss.add(ts); + obs.subscribe(ts); + } + + Disposable s = co.connect(); + + for (TestSubscriber ts : tss) { + ts.awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + s.dispose(); + } + } + } + + @Test + public void asyncFusedObserveOn() { + ConnectableFlowable co = Flowable.range(0, 1000).observeOn(ImmediateThinScheduler.INSTANCE).publish(); + for (int i = 0; i < 1000; i++) { + for (int j = 1; j < 6; j++) { + List> tss = new ArrayList>(); + for (int k = 1; k < j; k++) { + TestSubscriber ts = new TestSubscriber(); + tss.add(ts); + co.subscribe(ts); + } + + Disposable s = co.connect(); + + for (TestSubscriber ts : tss) { + ts.awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + s.dispose(); + } + } + } + + @Test + public void testObserveOn() { + ConnectableFlowable co = Flowable.range(0, 1000).hide().publish(); + Flowable obs = co.observeOn(Schedulers.computation()); + for (int i = 0; i < 1000; i++) { + for (int j = 1; j < 6; j++) { + List> tss = new ArrayList>(); + for (int k = 1; k < j; k++) { + TestSubscriber ts = new TestSubscriber(); + tss.add(ts); + obs.subscribe(ts); + } + + Disposable s = co.connect(); + + for (TestSubscriber ts : tss) { + ts.awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); } s.dispose(); } @@ -479,6 +562,13 @@ public void disposeOnArrival() { co.test(Long.MAX_VALUE, true).assertEmpty(); } + @Test + public void disposeOnArrival2() { + Flowable co = Flowable.never().publish().autoConnect(); + + co.test(Long.MAX_VALUE, true).assertEmpty(); + } + @Test public void dispose() { TestHelper.checkDisposed(Flowable.never().publish()); @@ -723,4 +813,64 @@ public Flowable apply(Flowable v) throws Exception { .test() .assertFailure(TestException.class); } + + @Test + public void pollThrows() { + Flowable.just(1) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .publish() + .autoConnect() + .test() + .assertFailure(TestException.class); + } + + @Test + public void dryRunCrash() { + final TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Object t) { + super.onNext(t); + onComplete(); + cancel(); + } + }; + + Flowable.range(1, 10) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + if (v == 2) { + throw new TestException(); + } + return v; + } + }) + .publish() + .autoConnect() + .subscribe(ts); + + ts + .assertResult(1); + } + + @Test + public void overflowQueue() { + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + }, BackpressureStrategy.NONE) + .publish(8) + .autoConnect() + .test(0L) + .assertFailure(MissingBackpressureException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index dd612cb3f1..6df9948062 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -22,7 +22,9 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; +import io.reactivex.internal.operators.flowable.FlowableSubscribeOn.SubscribeOnSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -285,4 +287,38 @@ public void dispose() { TestHelper.checkDisposed(Flowable.just(1).subscribeOn(Schedulers.single())); } + @Test + public void deferredRequestRace() { + for (int i = 0; i < 500; i++) { + + final TestSubscriber ts = new TestSubscriber(0L); + + Worker w = Schedulers.computation().createWorker(); + + final SubscribeOnSubscriber so = new SubscribeOnSubscriber(ts, w, Flowable.never()); + ts.onSubscribe(so); + + final BooleanSubscription bs = new BooleanSubscription(); + + try { + Runnable r1 = new Runnable() { + @Override + public void run() { + so.onSubscribe(bs); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + so.request(1); + } + }; + + TestHelper.race(r1, r2); + } finally { + w.dispose(); + } + } + } } \ No newline at end of file