diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index e8b51ffaf1..c66c57e2d1 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4549,7 +4549,7 @@ public final T blockingLast(T defaultItem) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Iterable blockingLatest() { - return BlockingObservableLatest.latest(this); + return new BlockingObservableLatest(this); } /** @@ -4571,7 +4571,7 @@ public final Iterable blockingLatest() { */ @SchedulerSupport(SchedulerSupport.NONE) public final Iterable blockingMostRecent(T initialValue) { - return BlockingObservableMostRecent.mostRecent(this, initialValue); + return new BlockingObservableMostRecent(this, initialValue); } /** @@ -4590,7 +4590,7 @@ public final Iterable blockingMostRecent(T initialValue) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Iterable blockingNext() { - return BlockingObservableNext.next(this); + return new BlockingObservableNext(this); } /** diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java index 77016ba158..7762f52c96 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java @@ -24,7 +24,7 @@ public final class BlockingObservableIterator extends AtomicReference -implements io.reactivex.Observer, Iterator, Runnable, Disposable { +implements io.reactivex.Observer, Iterator, Disposable { private static final long serialVersionUID = 6695226475494099826L; @@ -38,8 +38,6 @@ public final class BlockingObservableIterator volatile boolean done; Throwable error; - volatile boolean cancelled; - public BlockingObservableIterator(int batchSize) { this.queue = new SpscLinkedArrayQueue(batchSize); this.lock = new ReentrantLock(); @@ -49,9 +47,6 @@ public BlockingObservableIterator(int batchSize) { @Override public boolean hasNext() { for (;;) { - if (cancelled) { - return false; - } boolean d = done; boolean empty = queue.isEmpty(); if (d) { @@ -64,16 +59,19 @@ public boolean hasNext() { } } if (empty) { - lock.lock(); try { - while (!cancelled && !done && queue.isEmpty()) { - condition.await(); + lock.lock(); + try { + while (!done && queue.isEmpty()) { + condition.await(); + } + } finally { + lock.unlock(); } } catch (InterruptedException ex) { - run(); + DisposableHelper.dispose(this); + signalConsumer(); throw ExceptionHelper.wrapOrThrow(ex); - } finally { - lock.unlock(); } } else { return true; @@ -84,15 +82,7 @@ public boolean hasNext() { @Override public T next() { if (hasNext()) { - T v = queue.poll(); - - if (v == null) { - run(); - - throw new IllegalStateException("Queue empty?!"); - } - - return v; + return queue.poll(); } throw new NoSuchElementException(); } @@ -104,13 +94,8 @@ public void onSubscribe(Disposable s) { @Override public void onNext(T t) { - if (!queue.offer(t)) { - DisposableHelper.dispose(this); - - onError(new IllegalStateException("Queue full?!")); - } else { - signalConsumer(); - } + queue.offer(t); + signalConsumer(); } @Override @@ -135,12 +120,6 @@ void signalConsumer() { } } - @Override - public void run() { - DisposableHelper.dispose(this); - signalConsumer(); - } - @Override // otherwise default method which isn't available in Java 7 public void remove() { throw new UnsupportedOperationException("remove"); diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java index ef61324082..6442ec4185 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java @@ -21,41 +21,32 @@ import io.reactivex.Observable; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.DisposableObserver; +import io.reactivex.plugins.RxJavaPlugins; /** * Wait for and iterate over the latest values of the source observable. If the source works faster than the * iterator, values may be skipped, but not the {@code onError} or {@code onComplete} events. + * @param the value type */ -public enum BlockingObservableLatest { - ; - - /** - * Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not - * been returned by the {@code Iterable}, then returns that item. - * - * @param the value type - * @param source - * the source {@code Observable} - * @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not - * been returned by the {@code Iterable}, then returns that item - */ - public static Iterable latest(final ObservableSource source) { - return new Iterable() { - @Override - public Iterator iterator() { - LatestObserverIterator lio = new LatestObserverIterator(); - - @SuppressWarnings("unchecked") - Observable> materialized = Observable.wrap((ObservableSource)source).materialize(); - - materialized.subscribe(lio); - return lio; - } - }; +public final class BlockingObservableLatest implements Iterable { + + final ObservableSource source; + + public BlockingObservableLatest(ObservableSource source) { + this.source = source; + } + + @Override + public Iterator iterator() { + BlockingObservableLatestIterator lio = new BlockingObservableLatestIterator(); + + Observable> materialized = Observable.wrap(source).materialize(); + + materialized.subscribe(lio); + return lio; } - /** Observer of source, iterator for output. */ - static final class LatestObserverIterator extends DisposableObserver> implements Iterator { + static final class BlockingObservableLatestIterator extends DisposableObserver> implements Iterator { // iterator's notification Notification iteratorNotification; @@ -73,7 +64,7 @@ public void onNext(Notification args) { @Override public void onError(Throwable e) { - // not expected + RxJavaPlugins.onError(e); } @Override @@ -86,22 +77,20 @@ public boolean hasNext() { if (iteratorNotification != null && iteratorNotification.isOnError()) { throw ExceptionHelper.wrapOrThrow(iteratorNotification.getError()); } - if (iteratorNotification == null || iteratorNotification.isOnNext()) { - if (iteratorNotification == null) { - try { - notify.acquire(); - } catch (InterruptedException ex) { - dispose(); - Thread.currentThread().interrupt(); - iteratorNotification = Notification.createOnError(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } - - Notification n = value.getAndSet(null); - iteratorNotification = n; - if (n.isOnError()) { - throw ExceptionHelper.wrapOrThrow(n.getError()); - } + if (iteratorNotification == null) { + try { + notify.acquire(); + } catch (InterruptedException ex) { + dispose(); + Thread.currentThread().interrupt(); + iteratorNotification = Notification.createOnError(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + + Notification n = value.getAndSet(null); + iteratorNotification = n; + if (n.isOnError()) { + throw ExceptionHelper.wrapOrThrow(n.getError()); } } return iteratorNotification.isOnNext(); @@ -110,11 +99,9 @@ public boolean hasNext() { @Override public T next() { if (hasNext()) { - if (iteratorNotification.isOnNext()) { - T v = iteratorNotification.getValue(); - iteratorNotification = null; - return v; - } + T v = iteratorNotification.getValue(); + iteratorNotification = null; + return v; } throw new NoSuchElementException(); } @@ -123,6 +110,5 @@ public T next() { public void remove() { throw new UnsupportedOperationException("Read-only iterator."); } - } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java index 5af0304460..85a9f1b272 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java @@ -25,36 +25,31 @@ * seed value if no item has yet been emitted. *

* + * + * @param the value type */ -public enum BlockingObservableMostRecent { - ; - /** - * Returns an {@code Iterable} that always returns the item most recently emitted by the {@code Observable}. - * - * @param the value type - * @param source - * the source {@code Observable} - * @param initialValue - * a default item to return from the {@code Iterable} if {@code source} has not yet emitted any - * items - * @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or - * {@code initialValue} if {@code source} has not yet emitted any items - */ - public static Iterable mostRecent(final ObservableSource source, final T initialValue) { - return new Iterable() { - @Override - public Iterator iterator() { - MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); +public final class BlockingObservableMostRecent implements Iterable { - /** - * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain - * since it is for BlockingObservable. - */ - source.subscribe(mostRecentObserver); + final ObservableSource source; + + final T initialValue; + + public BlockingObservableMostRecent(ObservableSource source, T initialValue) { + this.source = source; + this.initialValue = initialValue; + } + + @Override + public Iterator iterator() { + MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); + + /** + * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain + * since it is for BlockingObservable. + */ + source.subscribe(mostRecentObserver); - return mostRecentObserver.getIterable(); - } - }; + return mostRecentObserver.getIterable(); } static final class MostRecentObserver extends DefaultObserver { diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java index 209569b6a5..1d5f8afd21 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java @@ -17,50 +17,44 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import io.reactivex.Notification; -import io.reactivex.Observable; +import io.reactivex.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.DisposableObserver; +import io.reactivex.plugins.RxJavaPlugins; /** * Returns an Iterable that blocks until the Observable emits another item, then returns that item. *

* + * + * @param the value type */ -public enum BlockingObservableNext { - ; - /** - * Returns an {@code Iterable} that blocks until the {@code Observable} emits another item, then returns - * that item. - * - * @param the value type - * @param items - * the {@code Observable} to observe - * @return an {@code Iterable} that behaves like a blocking version of {@code items} - */ - public static Iterable next(final Observable items) { - return new Iterable() { - @Override - public Iterator iterator() { - NextObserver nextObserver = new NextObserver(); - return new NextIterator(items, nextObserver); - } - }; +public final class BlockingObservableNext implements Iterable { + + final ObservableSource source; + public BlockingObservableNext(ObservableSource source) { + this.source = source; + } + + @Override + public Iterator iterator() { + NextObserver nextObserver = new NextObserver(); + return new NextIterator(source, nextObserver); } // test needs to access the observer.waiting flag static final class NextIterator implements Iterator { private final NextObserver observer; - private final Observable items; + private final ObservableSource items; private T next; private boolean hasNext = true; private boolean isNextConsumed = true; private Throwable error; private boolean started; - NextIterator(Observable items, NextObserver observer) { + NextIterator(ObservableSource items, NextObserver observer) { this.items = items; this.observer = observer; } @@ -82,39 +76,36 @@ public boolean hasNext() { } private boolean moveToNext() { - try { - if (!started) { - started = true; - // if not started, start now - observer.setWaiting(); - @SuppressWarnings("unchecked") - Observable nbpObservable = (Observable)items; - nbpObservable.materialize().subscribe(observer); - } + if (!started) { + started = true; + // if not started, start now + observer.setWaiting(); + new ObservableMaterialize(items).subscribe(observer); + } - Notification nextNotification = observer.takeNext(); - if (nextNotification.isOnNext()) { - isNextConsumed = false; - next = nextNotification.getValue(); - return true; - } - // If an observable is completed or fails, - // hasNext() always return false. - hasNext = false; - if (nextNotification.isOnComplete()) { - return false; - } - if (nextNotification.isOnError()) { - error = nextNotification.getError(); - throw ExceptionHelper.wrapOrThrow(error); - } - throw new IllegalStateException("Should not reach here"); + Notification nextNotification; + + try { + nextNotification = observer.takeNext(); } catch (InterruptedException e) { observer.dispose(); - Thread.currentThread().interrupt(); error = e; throw ExceptionHelper.wrapOrThrow(e); } + + if (nextNotification.isOnNext()) { + isNextConsumed = false; + next = nextNotification.getValue(); + return true; + } + // If an observable is completed or fails, + // hasNext() always return false. + hasNext = false; + if (nextNotification.isOnComplete()) { + return false; + } + error = nextNotification.getError(); + throw ExceptionHelper.wrapOrThrow(error); } @Override @@ -149,7 +140,7 @@ public void onComplete() { @Override public void onError(Throwable e) { - // ignore + RxJavaPlugins.onError(e); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java index 078268e5f6..e70bfa8f9e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java @@ -137,7 +137,7 @@ public boolean isDisposed() { } } - static final class AmbInnerObserver extends AtomicReference implements Observer, Disposable { + static final class AmbInnerObserver extends AtomicReference implements Observer { private static final long serialVersionUID = -1185974347409665484L; final AmbCoordinator parent; @@ -180,7 +180,6 @@ public void onError(Throwable t) { won = true; actual.onError(t); } else { - get().dispose(); RxJavaPlugins.onError(t); } } @@ -194,20 +193,12 @@ public void onComplete() { if (parent.win(index)) { won = true; actual.onComplete(); - } else { - get().dispose(); } } } - @Override public void dispose() { DisposableHelper.dispose(this); } - - @Override - public boolean isDisposed() { - return get() == DisposableHelper.DISPOSED; - } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java index 6334992dd2..443e63c28d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java @@ -17,62 +17,55 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.observers.*; import io.reactivex.internal.util.*; -import io.reactivex.observers.DefaultObserver; -import io.reactivex.plugins.RxJavaPlugins; /** * Utility methods to consume an Observable in a blocking manner with callbacks or Observer. */ -public enum ObservableBlockingSubscribe { - ; +public final class ObservableBlockingSubscribe { + + /** Utility class. */ + private ObservableBlockingSubscribe() { + throw new IllegalStateException("No instances!"); + } /** * Subscribes to the source and calls the Observer methods on the current thread. *

* @param o the source publisher * The call to dispose() is composed through. - * @param subscriber the subscriber to forward events and calls to in the current thread + * @param observer the subscriber to forward events and calls to in the current thread * @param the value type */ - public static void subscribe(ObservableSource o, Observer subscriber) { + public static void subscribe(ObservableSource o, Observer observer) { final BlockingQueue queue = new LinkedBlockingQueue(); BlockingObserver bs = new BlockingObserver(queue); + observer.onSubscribe(bs); o.subscribe(bs); - - try { - for (;;) { - if (bs.isDisposed()) { - break; - } - Object v = queue.poll(); - if (v == null) { - if (bs.isDisposed()) { - break; - } + for (;;) { + if (bs.isDisposed()) { + break; + } + Object v = queue.poll(); + if (v == null) { + try { v = queue.take(); - } - if (bs.isDisposed()) { - break; - } - if (o == BlockingObserver.TERMINATED) { - break; - } - if (NotificationLite.acceptFull(v, subscriber)) { - break; + } catch (InterruptedException ex) { + bs.dispose(); + observer.onError(ex); + return; } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - subscriber.onError(e); - } finally { - bs.dispose(); + if (bs.isDisposed() + || o == BlockingObserver.TERMINATED + || NotificationLite.acceptFull(v, observer)) { + break; + } } } @@ -121,50 +114,6 @@ public void accept(Disposable s) { */ public static void subscribe(ObservableSource o, final Consumer onNext, final Consumer onError, final Action onComplete) { - subscribe(o, new DefaultObserver() { - boolean done; - @Override - public void onNext(T t) { - if (done) { - return; - } - try { - onNext.accept(t); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancel(); - onError(ex); - } - } - - @Override - public void onError(Throwable e) { - if (done) { - RxJavaPlugins.onError(e); - return; - } - done = true; - try { - onError.accept(e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } - } - - @Override - public void onComplete() { - if (done) { - return; - } - done = true; - try { - onComplete.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } - } - }); + subscribe(o, new LambdaObserver(onNext, onError, onComplete, Functions.emptyConsumer())); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java index 20bc5d7fa9..c7a0e583e7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java @@ -136,7 +136,7 @@ static final class SerializedEmitter final AtomicThrowable error; - final SimpleQueue queue; + final SpscLinkedArrayQueue queue; volatile boolean done; @@ -206,7 +206,7 @@ void drain() { void drainLoop() { ObservableEmitter e = emitter; - SimpleQueue q = queue; + SpscLinkedArrayQueue q = queue; AtomicThrowable error = this.error; int missed = 1; for (;;) { @@ -224,15 +224,7 @@ void drainLoop() { } boolean d = done; - T v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - // should never happen - v = null; - } + T v = q.poll(); boolean empty = v == null; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDetach.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDetach.java index 9381d1effd..d47378bcaa 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDetach.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDetach.java @@ -55,8 +55,7 @@ public void dispose() { @Override public boolean isDisposed() { - Disposable s = this.s; - return s == null || s.isDisposed(); + return s.isDisposed(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 3c344ff2d7..c12a421d9e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.plugins.RxJavaPlugins; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -23,9 +22,11 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableFlatMap extends AbstractObservableWithUpstream { final Function> mapper; @@ -67,9 +68,7 @@ static final class MergeObserver extends AtomicInteger implements Disposab volatile boolean done; - final AtomicReference> errors = new AtomicReference>(); - - static final SimpleQueue ERRORS_CLOSED = new RejectingQueue(); + final AtomicThrowable errors = new AtomicThrowable(); volatile boolean cancelled; @@ -169,10 +168,10 @@ void addInner(InnerObserver inner) { void removeInner(InnerObserver inner) { for (;;) { InnerObserver[] a = observers.get(); - if (a == CANCELLED || a == EMPTY) { + int n = a.length; + if (n == 0) { return; } - int n = a.length; int j = -1; for (int i = 0; i < n; i++) { if (a[i] == inner) { @@ -216,7 +215,7 @@ void tryEmitScalar(Callable value) { u = value.call(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - getErrorQueue().offer(ex); + errors.addThrowable(ex); drain(); return; } @@ -282,9 +281,12 @@ public void onError(Throwable t) { RxJavaPlugins.onError(t); return; } - getErrorQueue().offer(t); - done = true; - drain(); + if (errors.addThrowable(t)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } } @Override @@ -300,9 +302,11 @@ public void onComplete() { public void dispose() { if (!cancelled) { cancelled = true; - if (getAndIncrement() == 0) { - s.dispose(); - disposeAll(); + if (disposeAll()) { + Throwable ex = errors.terminate(); + if (ex != null && ex != ExceptionHelper.TERMINATED) { + RxJavaPlugins.onError(ex); + } } } } @@ -338,7 +342,7 @@ void drainLoop() { o = svq.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - getErrorQueue().offer(ex); + errors.addThrowable(ex); continue; } if (o == null) { @@ -359,11 +363,11 @@ void drainLoop() { int n = inner.length; if (d && (svq == null || svq.isEmpty()) && n == 0) { - SimpleQueue e = errors.get(); - if (e == null || e.isEmpty()) { + Throwable ex = errors.get(); + if (ex == null) { child.onComplete(); } else { - reportError(e); + child.onError(errors.terminate()); } return; } @@ -414,7 +418,7 @@ void drainLoop() { o = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - getErrorQueue().offer(ex); + errors.addThrowable(ex); continue; } if (o == null) { @@ -469,93 +473,34 @@ void drainLoop() { boolean checkTerminate() { if (cancelled) { - s.dispose(); - disposeAll(); return true; } - SimpleQueue e = errors.get(); - if (!delayErrors && (e != null && !e.isEmpty())) { - try { - reportError(e); - } finally { - disposeAll(); - } + Throwable e = errors.get(); + if (!delayErrors && (e != null)) { + actual.onError(errors.terminate()); return true; } return false; } - void reportError(SimpleQueue q) { - List composite = null; - Throwable ex = null; - - for (;;) { - Throwable t; - try { - t = q.poll(); - } catch (Throwable exc) { - Exceptions.throwIfFatal(exc); - if (ex == null) { - ex = exc; - } else { - if (composite == null) { - composite = new ArrayList(); - composite.add(ex); - } - composite.add(exc); - } - break; - } - - if (t == null) { - break; - } - if (ex == null) { - ex = t; - } else { - if (composite == null) { - composite = new ArrayList(); - composite.add(ex); - } - composite.add(t); - } - } - if (composite != null) { - actual.onError(new CompositeException(composite)); - } else { - actual.onError(ex); - } - } - - void disposeAll() { + boolean disposeAll() { + s.dispose(); InnerObserver[] a = observers.get(); if (a != CANCELLED) { a = observers.getAndSet(CANCELLED); if (a != CANCELLED) { - errors.getAndSet(ERRORS_CLOSED); for (InnerObserver inner : a) { inner.dispose(); } + return true; } } - } - - SimpleQueue getErrorQueue() { - for (;;) { - SimpleQueue q = errors.get(); - if (q != null) { - return q; - } - q = new MpscLinkedQueue(); - if (errors.compareAndSet(null, q)) { - return q; - } - } + return false; } } static final class InnerObserver extends AtomicReference - implements Observer, Disposable { + implements Observer { private static final long serialVersionUID = -4606175640614850599L; final long id; @@ -602,9 +547,15 @@ public void onNext(U t) { } @Override public void onError(Throwable t) { - parent.getErrorQueue().offer(t); - done = true; - parent.drain(); + if (parent.errors.addThrowable(t)) { + if (!parent.delayErrors) { + parent.disposeAll(); + } + done = true; + parent.drain(); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { @@ -612,42 +563,8 @@ public void onComplete() { parent.drain(); } - @Override public void dispose() { DisposableHelper.dispose(this); } - - @Override - public boolean isDisposed() { - return get() == DisposableHelper.DISPOSED; - } - } - - static final class RejectingQueue implements SimpleQueue { - @Override - public boolean offer(T e) { - return false; - } - - @Override - public T poll() { - return null; - } - - @Override - public boolean offer(T v1, T v2) { - return false; - } - - @Override - public boolean isEmpty() { - return true; - } - - @Override - public void clear() { - - } - } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromPublisher.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromPublisher.java index 8ad6dd823b..4039920faa 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromPublisher.java @@ -12,71 +12,68 @@ */ package io.reactivex.internal.operators.observable; -import io.reactivex.Observable; -import io.reactivex.Observer; +import org.reactivestreams.*; + +import io.reactivex.*; import io.reactivex.disposables.Disposable; -import java.util.concurrent.atomic.AtomicBoolean; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import io.reactivex.internal.subscriptions.SubscriptionHelper; public final class ObservableFromPublisher extends Observable { - private final Publisher publisher; + + final Publisher source; public ObservableFromPublisher(Publisher publisher) { - this.publisher = publisher; + this.source = publisher; } @Override protected void subscribeActual(final Observer o) { - publisher.subscribe(new PublisherSubscriber(o)); + source.subscribe(new PublisherSubscriber(o)); } static final class PublisherSubscriber - extends AtomicBoolean implements Subscriber, Disposable { - - private static final long serialVersionUID = -7306579371159152354L; - - private final Observer o; - private Subscription inner; + final Observer actual; + Subscription s; PublisherSubscriber(Observer o) { - this.o = o; + this.actual = o; } @Override public void onComplete() { - o.onComplete(); + actual.onComplete(); } @Override public void onError(Throwable t) { - o.onError(t); + actual.onError(t); } @Override public void onNext(T t) { - o.onNext(t); + actual.onNext(t); } @Override - public void onSubscribe(Subscription inner) { - this.inner = inner; - o.onSubscribe(this); - inner.request(Long.MAX_VALUE); + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + actual.onSubscribe(this); + s.request(Long.MAX_VALUE); + } } - @Override public void dispose() { - if (compareAndSet(false, true)) { - inner.cancel(); - inner = null; - } + @Override + public void dispose() { + s.cancel(); + s = SubscriptionHelper.CANCELLED; } - @Override public boolean isDisposed() { - return get(); + @Override + public boolean isDisposed() { + return s == SubscriptionHelper.CANCELLED; } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java index c26e5fe903..d3de983da4 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java @@ -12,10 +12,8 @@ */ package io.reactivex.internal.operators.observable; -import io.reactivex.Observable; -import io.reactivex.Observer; -import io.reactivex.internal.fuseable.QueueDisposable; -import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.*; +import io.reactivex.internal.observers.BasicIntQueueDisposable; public final class ObservableRangeLong extends Observable { private final long start; @@ -34,8 +32,7 @@ protected void subscribeActual(Observer o) { } static final class RangeDisposable - extends AtomicLong - implements QueueDisposable { + extends BasicIntQueueDisposable { private static final long serialVersionUID = 396518478098735504L; @@ -68,16 +65,6 @@ void run() { } } - @Override - public boolean offer(Long value) { - throw new UnsupportedOperationException("Should not be called!"); - } - - @Override - public boolean offer(Long v1, Long v2) { - throw new UnsupportedOperationException("Should not be called!"); - } - @Override public Long poll() throws Exception { long i = index; diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 8efd219c2d..a60e06d802 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -30,7 +30,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.CompositeException; import io.reactivex.functions.*; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.operators.maybe.MaybeToFlowable; import io.reactivex.internal.operators.single.SingleToFlowable; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -1278,7 +1278,6 @@ protected void subscribeActual(Subscriber subscriber) { } } - /** * Check if the given transformed reactive type reports multiple onSubscribe calls to * RxJavaPlugins. @@ -1333,6 +1332,221 @@ protected void subscribeActual(Observer observer) { } } + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToSingle(Function, ? extends SingleSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToCompletable(Function, ? extends CompletableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + CompletableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowableToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + try { + BooleanSubscription d1 = new BooleanSubscription(); + + observer.onSubscribe(d1); + + BooleanSubscription d2 = new BooleanSubscription(); + + observer.onSubscribe(d2); + + b[0] = d1.isCancelled(); + b[1] = d2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First cancelled?", false, b[0]); + assertEquals("Second not cancelled?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** * Check if the given transformed reactive type reports multiple onSubscribe calls to * RxJavaPlugins. @@ -1652,4 +1866,136 @@ public static void emit(Subject p, T... values) { } p.onComplete(); } + + /** + * Checks if the source is fuseable and its isEmpty/clear works properly. + * @param the value type + * @param source the source sequence + */ + public static void checkFusedIsEmptyClear(Observable source) { + final CountDownLatch cdl = new CountDownLatch(1); + + final Boolean[] state = { null, null, null, null }; + + source.subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + try { + if (d instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) d; + state[0] = true; + + int m = qd.requestFusion(QueueDisposable.ANY); + + if (m != QueueDisposable.NONE) { + state[1] = true; + + state[2] = qd.isEmpty(); + + qd.clear(); + + state[3] = qd.isEmpty(); + } + } + cdl.countDown(); + } finally { + d.dispose(); + } + } + + @Override + public void onNext(T value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + assertTrue("Not fuseable", state[0]); + assertTrue("Fusion rejected", state[1]); + + assertNotNull(state[2]); + assertTrue("Did not empty", state[3]); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Checks if the source is fuseable and its isEmpty/clear works properly. + * @param the value type + * @param source the source sequence + */ + public static void checkFusedIsEmptyClear(Flowable source) { + final CountDownLatch cdl = new CountDownLatch(1); + + final Boolean[] state = { null, null, null, null }; + + source.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription d) { + try { + if (d instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qd = (QueueSubscription) d; + state[0] = true; + + int m = qd.requestFusion(QueueSubscription.ANY); + + if (m != QueueSubscription.NONE) { + state[1] = true; + + state[2] = qd.isEmpty(); + + qd.clear(); + + state[3] = qd.isEmpty(); + } + } + cdl.countDown(); + } finally { + d.cancel(); + } + } + + @Override + public void onNext(T value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + assertTrue("Not fuseable", state[0]); + assertTrue("Fusion rejected", state[1]); + + assertNotNull(state[2]); + assertTrue("Did not empty", state[3]); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableLatestTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableLatestTest.java index d0ed5c21bc..794298d246 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableLatestTest.java @@ -13,12 +13,18 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.*; + import java.util.*; import java.util.concurrent.TimeUnit; import org.junit.*; import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -162,4 +168,62 @@ public void testFasterSource() { Assert.assertEquals(false, it.hasNext()); } + + @Test(expected = UnsupportedOperationException.class) + public void remove() { + Observable.never().blockingLatest().iterator().remove(); + } + + @Test(expected = NoSuchElementException.class) + public void empty() { + Observable.empty().blockingLatest().iterator().next(); + } + + @Test(expected = TestException.class) + public void error() { + Observable.error(new TestException()).blockingLatest().iterator().next(); + } + + @Test + public void error2() { + Iterator it = Observable.error(new TestException()).blockingLatest().iterator(); + + for (int i = 0; i < 3; i++) { + try { + it.hasNext(); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } + } + + @Test + public void interrupted() { + Iterator it = Observable.never().blockingLatest().iterator(); + + Thread.currentThread().interrupt(); + + try { + it.hasNext(); + } catch (RuntimeException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException); + } + Thread.interrupted(); + } + + @SuppressWarnings("unchecked") + @Test + public void onError() { + Iterator it = Observable.never().blockingLatest().iterator(); + + List errors = TestHelper.trackPluginErrors(); + try { + ((Observer)it).onError(new TestException()); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecentTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecentTest.java index 56897cfe79..3735468da8 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecentTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecentTest.java @@ -13,10 +13,9 @@ package io.reactivex.internal.operators.observable; -import static io.reactivex.internal.operators.observable.BlockingObservableMostRecent.mostRecent; import static org.junit.Assert.*; -import java.util.Iterator; +import java.util.*; import java.util.concurrent.TimeUnit; import org.junit.*; @@ -32,6 +31,10 @@ public void testMostRecentNull() { assertEquals(null, Observable.never().blockingMostRecent(null).iterator().next()); } + static Iterable mostRecent(Observable source, T initialValue) { + return source.blockingMostRecent(initialValue); + } + @Test public void testMostRecent() { Subject s = PublishSubject.create(); @@ -97,4 +100,25 @@ public void testSingleSourceManyIterators() { } } + + @Test + public void empty() { + Iterator it = Observable.empty() + .blockingMostRecent(1) + .iterator(); + + try { + it.next(); + fail("Should have thrown"); + } catch (NoSuchElementException ex) { + // expected + } + + try { + it.remove(); + fail("Should have thrown"); + } catch (UnsupportedOperationException ex) { + // expected + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java index f40dc019b5..e3e0fbc064 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.observable; -import static io.reactivex.internal.operators.observable.BlockingObservableNext.next; import static org.junit.Assert.*; import java.util.*; @@ -22,11 +21,13 @@ import org.junit.*; +import io.reactivex.*; import io.reactivex.Observable; -import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.operators.observable.BlockingObservableNext.NextObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.BehaviorProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.*; @@ -61,6 +62,10 @@ public void run() { }.start(); } + static Iterable next(ObservableSource source) { + return new BlockingObservableNext(source); + } + @Test public void testNext() { Subject obs = PublishSubject.create(); @@ -317,4 +322,61 @@ public void testSynchronousNext() { assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue()); assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue()); } + + @Test + public void interrupt() { + Iterator it = Observable.never().blockingNext().iterator(); + + try { + Thread.currentThread().interrupt(); + it.next(); + } catch (RuntimeException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void remove() { + Observable.never().blockingNext().iterator().remove(); + } + + @Test + public void nextObserverError() { + NextObserver no = new NextObserver(); + + List errors = TestHelper.trackPluginErrors(); + try { + no.onError(new TestException()); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void nextObserverOnNext() throws Exception { + NextObserver no = new NextObserver(); + + no.setWaiting(); + no.onNext(Notification.createOnNext(1)); + + no.setWaiting(); + no.onNext(Notification.createOnNext(1)); + + assertEquals(1, no.takeNext().getValue().intValue()); + } + + @Test + public void nextObserverOnCompleteOnNext() throws Exception { + NextObserver no = new NextObserver(); + + no.setWaiting(); + no.onNext(Notification.createOnComplete()); + + no.setWaiting(); + no.onNext(Notification.createOnNext(1)); + + assertTrue(no.takeNext().isOnComplete()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java index 367762c833..b20f96c0f7 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java @@ -13,13 +13,15 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; -import java.util.Iterator; +import java.util.*; import org.junit.*; -import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.ObservableSource; +import io.reactivex.Observer; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; @@ -79,4 +81,41 @@ public void subscribe(Observer observer) { System.out.println(string); } } + + @Test + public void dispose() { + BlockingObservableIterator it = new BlockingObservableIterator(128); + + assertFalse(it.isDisposed()); + + it.dispose(); + + assertTrue(it.isDisposed()); + } + + @Test + public void interruptWait() { + BlockingObservableIterator it = new BlockingObservableIterator(128); + + try { + Thread.currentThread().interrupt(); + + it.hasNext(); + } catch (RuntimeException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException); + } + } + + @Test(expected = NoSuchElementException.class) + public void emptyThrowsNoSuch() { + BlockingObservableIterator it = new BlockingObservableIterator(128); + it.onComplete(); + it.next(); + } + + @Test(expected = UnsupportedOperationException.class) + public void remove() { + BlockingObservableIterator it = new BlockingObservableIterator(128); + it.remove(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java index 8d5c74f956..3f6d6529be 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -24,9 +25,13 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Consumer; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -233,4 +238,129 @@ public void ambArraySingleElement() { assertSame(Observable.never(), Observable.ambArray(Observable.never())); } + @Test + public void manySources() { + Observable[] a = new Observable[32]; + Arrays.fill(a, Observable.never()); + a[31] = Observable.just(1); + + Observable.amb(Arrays.asList(a)) + .test() + .assertResult(1); + } + + @Test + public void emptyIterable() { + Observable.amb(Collections.>emptyList()) + .test() + .assertResult(); + } + + @Test + public void singleIterable() { + Observable.amb(Collections.singletonList(Observable.just(1))) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.ambArray(Observable.never(), Observable.never())); + } + + @Test + public void onNextRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.ambArray(ps1, ps2).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onNext(1); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertSubscribed().assertNoErrors() + .assertNotComplete().assertValueCount(1); + } + } + + @Test + public void onCompleteRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.ambArray(ps1, ps2).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertResult(); + } + } + + @Test + public void onErrorRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.ambArray(ps1, ps2).test(); + + final Throwable ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + List errors = TestHelper.trackPluginErrors(); + try { + TestHelper.race(r1, r2, Schedulers.single()); + } finally { + RxJavaPlugins.reset(); + } + + to.assertFailure(TestException.class); + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java index f49d9c99e5..ba138077c5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java @@ -14,15 +14,22 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableAnyTest { @@ -477,4 +484,50 @@ public boolean test(String v) { // FIXME value as last cause? // assertTrue(ex.getCause().getMessage().contains("Boo!")); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).any(Functions.alwaysTrue()).toObservable()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.any(Functions.alwaysTrue()).toObservable(); + } + }); + } + + @Test + public void predicateThrowsSuppressOthers() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onNext(2); + observer.onError(new IOException()); + observer.onComplete(); + } + } + .any(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + throw new TestException(); + } + }) + .toObservable() + .test() + .assertFailure(TestException.class); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java index 9f8d6e87e6..6e11142c11 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java @@ -16,15 +16,18 @@ import static org.junit.Assert.assertEquals; import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.disposables.Disposable; +import io.reactivex.TestHelper; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; public class ObservableBlockingTest { @@ -219,4 +222,65 @@ public void blockingLastNormal() { public void blockingSingleEmpty() { Observable.empty().blockingSingle(); } + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(ObservableBlockingSubscribe.class); + } + + @Test + public void disposeUpFront() { + TestObserver to = new TestObserver(); + to.dispose(); + Observable.just(1).blockingSubscribe(to); + + to.assertEmpty(); + } + + @SuppressWarnings("rawtypes") + @Test + public void delayed() throws Exception { + final TestObserver to = new TestObserver(); + final Observer[] s = { null }; + + Schedulers.single().scheduleDirect(new Runnable() { + @SuppressWarnings("unchecked") + @Override + public void run() { + to.dispose(); + s[0].onNext(1); + } + }, 200, TimeUnit.MILLISECONDS); + + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + s[0] = observer; + } + }.blockingSubscribe(to); + + while (!to.isDisposed()) { + Thread.sleep(100); + } + + to.assertEmpty(); + } + + @Test + public void interrupt() { + TestObserver to = new TestObserver(); + Thread.currentThread().interrupt(); + Observable.never().blockingSubscribe(to); + } + + @Test + public void onCompleteDelayed() { + TestObserver to = new TestObserver(); + + Observable.empty().delay(100, TimeUnit.MILLISECONDS) + .blockingSubscribe(to); + + to.assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java index 722706667a..7c1f27fd48 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java @@ -15,12 +15,18 @@ import static org.junit.Assert.*; +import java.io.IOException; +import java.util.List; + import org.junit.Test; import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Cancellable; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; public class ObservableCreateTest { @@ -251,4 +257,342 @@ public void subscribe(ObservableEmitter e) throws Exception { assertNull(error[0]); } + + @Test + public void callbackThrows() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void nullValue() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e.onNext(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void nullThrowable() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e.onError(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void nullValueSync() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e.serialize().onNext(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void nullThrowableSync() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e.serialize().onError(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void onErrorCrash() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + Disposable d = Disposables.empty(); + e.setDisposable(d); + try { + e.onError(new IOException()); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + assertTrue(d.isDisposed()); + } + }) + .subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onNext(Object value) { + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void onCompleteCrash() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + Disposable d = Disposables.empty(); + e.setDisposable(d); + try { + e.onComplete(); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + assertTrue(d.isDisposed()); + } + }) + .subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onNext(Object value) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + throw new TestException(); + } + }); + } + + @Test + public void serialized() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + ObservableEmitter f = e.serialize(); + + assertSame(f, f.serialize()); + + assertFalse(f.isDisposed()); + + final int[] calls = { 0 }; + + f.setCancellable(new Cancellable() { + @Override + public void cancel() throws Exception { + calls[0]++; + } + }); + + e.onComplete(); + + assertTrue(f.isDisposed()); + + assertEquals(1, calls[0]); + } + }) + .test() + .assertResult(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void serializedConcurrentOnNext() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + final ObservableEmitter f = e.serialize(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + f.onNext(1); + } + } + }; + + TestHelper.race(r1, r1, Schedulers.single()); + } + }) + .take(1000) + .test() + .assertSubscribed().assertValueCount(1000).assertComplete().assertNoErrors(); + } + + @Test + public void serializedConcurrentOnNextOnError() { + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + final ObservableEmitter f = e.serialize(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + f.onNext(1); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 100; i++) { + f.onNext(1); + } + f.onError(new TestException()); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + }) + .test() + .assertSubscribed().assertNotComplete() + .assertError(TestException.class); + } + + @Test + public void serializedConcurrentOnNextOnComplete() { + TestObserver to = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + final ObservableEmitter f = e.serialize(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + f.onNext(1); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 100; i++) { + f.onNext(1); + } + f.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + }) + .test() + .assertSubscribed().assertComplete() + .assertNoErrors(); + + int c = to.valueCount(); + assertTrue("" + c, c >= 100); + } + + @Test + public void onErrorRace() { + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + final ObservableEmitter f = e.serialize(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + f.onError(null); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + f.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + }); + + List errors = TestHelper.trackPluginErrors(); + + try { + for (int i = 0; i < 500; i++) { + source + .test() + .assertFailure(Throwable.class); + } + } finally { + RxJavaPlugins.reset(); + } + assertFalse(errors.isEmpty()); + } + + @Test + public void onCompleteRace() { + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + final ObservableEmitter f = e.serialize(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + f.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + f.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + }); + + for (int i = 0; i < 500; i++) { + source + .test() + .assertResult(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDetachTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDetachTest.java index f3e260519a..cc1a99ba2e 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDetachTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDetachTest.java @@ -19,8 +19,9 @@ import org.junit.*; -import io.reactivex.Observable; +import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; import io.reactivex.observers.TestObserver; @@ -156,4 +157,19 @@ public void deferredUpstreamProducer() { // ts.assertComplete(); // ts.assertNoErrors(); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.never().onTerminateDetach()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.onTerminateDetach(); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java index f608e7622b..592cf5eb68 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.functions.Function; import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.schedulers.Schedulers; @@ -57,4 +58,19 @@ public void fromArrayEmpty() { public void fromArraySingle() { assertTrue(Observable.fromArray(1) instanceof ScalarCallable); } + + @Test + public void fromPublisherDispose() { + TestHelper.checkDisposed(Flowable.just(1).toObservable()); + } + + @Test + public void fromPublisherDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Flowable f) throws Exception { + return f.toObservable(); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java index 14445d350d..a76b0b7b8e 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java @@ -13,12 +13,16 @@ package io.reactivex.internal.operators.observable; +import java.util.List; import java.util.concurrent.Callable; import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableGenerateTest { @@ -44,4 +48,82 @@ public void accept(Object d) throws Exception { .test() .assertResult(10, 10, 10, 10, 10); } + + @Test + public void stateSupplierThrows() { + Observable.generate(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onNext(s); + } + }, Functions.emptyConsumer()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void generatorThrows() { + Observable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + throw new TestException(); + } + }, Functions.emptyConsumer()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void disposerThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onComplete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onComplete(); + } + }, Functions.emptyConsumer())); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java index c1e8394276..9dfa0abbec 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java @@ -14,15 +14,17 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.util.NoSuchElementException; + import org.junit.Test; import org.mockito.InOrder; -import java.util.NoSuchElementException; - import io.reactivex.*; -import io.reactivex.functions.Predicate; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; public class ObservableLastTest { @@ -294,4 +296,27 @@ public void lastOrErrorError() { .assertErrorMessage("error") .assertError(RuntimeException.class); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.never().lastElement()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Observable o) throws Exception { + return o.lastElement(); + } + }); + } + + @Test + public void error() { + Observable.error(new TestException()) + .lastElement() + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java index 688b308526..101f92cedd 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java @@ -13,26 +13,19 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.Flowable; -import io.reactivex.Observable; -import io.reactivex.Observer; -import io.reactivex.TestHelper; -import io.reactivex.functions.Consumer; -import io.reactivex.observers.DefaultObserver; -import io.reactivex.observers.TestObserver; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; + import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import io.reactivex.*; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.observers.*; public class ObservableRangeLongTest { @Test @@ -167,4 +160,41 @@ public void countOne() { .test() .assertResult(5495454L); } + + @Test + public void noOverflow() { + Observable.rangeLong(Long.MAX_VALUE - 1, 2); + Observable.rangeLong(Long.MIN_VALUE, 2); + Observable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE); + } + + @Test + public void fused() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Observable.rangeLong(1, 2).subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.SYNC) + .assertResult(1L, 2L); + } + + @Test + public void fusedReject() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ASYNC); + + Observable.rangeLong(1, 2).subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.NONE) + .assertResult(1L, 2L); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.rangeLong(1, 2)); + } + + @Test + public void fusedClearIsEmpty() { + TestHelper.checkFusedIsEmptyClear(Observable.rangeLong(1, 2)); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableResourceWrapperTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableResourceWrapperTest.java new file mode 100644 index 0000000000..bc60272c0f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableResourceWrapperTest.java @@ -0,0 +1,69 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; + +public class ObservableResourceWrapperTest { + + @Test + public void disposed() { + TestObserver to = new TestObserver(); + ObserverResourceWrapper orw = new ObserverResourceWrapper(to); + + Disposable d = Disposables.empty(); + + orw.onSubscribe(d); + + assertFalse(orw.isDisposed()); + + orw.dispose(); + + assertTrue(orw.isDisposed()); + } + + @Test + public void doubleOnSubscribe() { + TestObserver to = new TestObserver(); + ObserverResourceWrapper orw = new ObserverResourceWrapper(to); + + TestHelper.doubleOnSubscribe(orw); + } + + @Test + public void onErrorDisposes() { + TestObserver to = new TestObserver(); + ObserverResourceWrapper orw = new ObserverResourceWrapper(to); + + Disposable d = Disposables.empty(); + Disposable d1 = Disposables.empty(); + + orw.setResource(d1); + + orw.onSubscribe(d); + + orw.onError(new TestException()); + + assertTrue(d1.isDisposed()); + + to.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java index c660e696ec..f307057cc2 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimerTest.java @@ -281,4 +281,9 @@ public void onComplete() { inOrder.verifyNoMoreInteractions(); verify(observer, never()).onComplete(); } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.timer(1, TimeUnit.DAYS)); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java index c49b10a26e..c9be68ed44 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java @@ -26,8 +26,11 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableUsingTest { @@ -431,4 +434,127 @@ public void run() { }; } + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.using( + new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, + new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + return Observable.never(); + } + }, + Functions.emptyConsumer() + )); + } + + @Test + public void supplierDisposerCrash() { + TestObserver to = Observable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + throw new TestException("First"); + } + }, new Consumer() { + @Override + public void accept(Object e) throws Exception { + throw new TestException("Second"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "First"); + TestHelper.assertError(errors, 1, TestException.class, "Second"); + } + + @Test + public void eagerOnErrorDisposerCrash() { + TestObserver to = Observable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + return Observable.error(new TestException("First")); + } + }, new Consumer() { + @Override + public void accept(Object e) throws Exception { + throw new TestException("Second"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "First"); + TestHelper.assertError(errors, 1, TestException.class, "Second"); + } + + @Test + public void eagerOnCompleteDisposerCrash() { + Observable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + return Observable.empty(); + } + }, new Consumer() { + @Override + public void accept(Object e) throws Exception { + throw new TestException("Second"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "Second"); + } + + @Test + public void nonEagerDisposerCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function>() { + @Override + public ObservableSource apply(Object v) throws Exception { + return Observable.empty(); + } + }, new Consumer() { + @Override + public void accept(Object e) throws Exception { + throw new TestException("Second"); + } + }, false) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java index ff86603831..983337040d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -25,7 +26,8 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.TestHelper; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; import io.reactivex.subjects.PublishSubject; @@ -417,4 +419,27 @@ public Observable call() { assertFalse(source.hasObservers()); assertFalse(boundary.hasObservers()); } + + @Test + public void boundaryDispose() { + TestHelper.checkDisposed(Observable.never().window(Observable.never())); + } + + @Test + public void boundaryDispose2() { + TestHelper.checkDisposed(Observable.never().window(Functions.justCallable(Observable.never()))); + } + + @Test + public void boundaryOnError() { + TestObserver to = Observable.error(new TestException()) + .window(Observable.never()) + .flatMap(Functions.>identity(), true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java index c5d205e36e..bd1420a010 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithSizeTest.java @@ -132,7 +132,7 @@ public void accept(Integer t1) { if (count.incrementAndGet() == 500000) { // give it a small break halfway through try { - Thread.sleep(5); + Thread.sleep(10); } catch (InterruptedException ex) { // ignored } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTestHelper.java b/src/test/java/io/reactivex/schedulers/SchedulerTestHelper.java index 1a7ace1a3b..5cc994126c 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTestHelper.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTestHelper.java @@ -81,7 +81,10 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t fail("timed out"); } - assertEquals("Handler should not have received anything", 0, handler.count); + if (handler.count != 0) { + handler.caught.printStackTrace(); + } + assertEquals("Handler should not have received anything: " + handler.caught, 0, handler.count); assertEquals("Observer should have received an error", 1, observer.errorCount); assertEquals("Observer should not have received a next value", 0, observer.nextCount); @@ -110,7 +113,7 @@ public void uncaughtException(Thread t, Throwable e) { } } - private static final class CapturingObserver extends DefaultSubscriber { + static final class CapturingObserver extends DefaultSubscriber { CountDownLatch completed = new CountDownLatch(1); int errorCount; int nextCount;