Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7966,7 +7966,7 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {

/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of {@link Flowable#bufferSize()} slots.
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
Expand All @@ -7976,7 +7976,9 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
* on the other side of the asynchronous boundary.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
Expand All @@ -7994,13 +7996,15 @@ public final Observable<T> observeOn(Scheduler scheduler) {

/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer and optionally delays onError notifications.
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size" and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
* on the other side of the asynchronous boundary.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
Expand All @@ -8023,13 +8027,15 @@ public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {

/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of configurable size and optionally delays onError notifications.
* asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
* on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public void drain(boolean delayError, Disposable dispose) {
QueueDrainHelper.drainLoop(queue, actual, delayError, dispose, this);
}
}

@Override
public void accept(Observer<? super V> a, U v) {
// ignored by default
}
}

// -------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public void onComplete() {
if (!DisposableHelper.isDisposed(d)) {
@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
de.emit();
if (de != null) {
de.emit();
}
DisposableHelper.dispose(timer);
worker.dispose();
actual.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(Notification<T> t) {
if (done) {
if (t.isOnError()) {
RxJavaPlugins.onError(t.getError());
}
return;
}
if (t.isOnError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,12 @@ public void onError(Throwable e) {
@Override
public void onComplete() {
if (decrementAndGet() == 0) {
if (delayErrors) {
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
return;
}
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
actual.onComplete();
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,12 @@ public void onError(Throwable e) {
@Override
public void onComplete() {
if (decrementAndGet() == 0) {
if (delayErrors) {
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
return;
}
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
actual.onComplete();
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
Expand Down Expand Up @@ -99,6 +100,9 @@ abstract static class BaseObserveOnSubscriber<T>

@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
Expand All @@ -114,15 +118,21 @@ public final void onNext(T t) {

@Override
public final void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
trySchedule();
}

@Override
public final void onComplete() {
done = true;
trySchedule();
if (!done) {
done = true;
trySchedule();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void subscribe(Subscriber<? super T> child) {
* Note: although technically correct, concurrent disconnects can cause
* unexpected behavior such as child subscribers never receiving anything
* (unless connected again). An alternative approach, similar to
* PublishSubject would be to immediately terminate such child
* PublishProcessor would be to immediately terminate such child
* subscribers as well:
*
* Object term = r.terminalEvent;
Expand Down Expand Up @@ -172,7 +172,7 @@ public void connect(Consumer<? super Disposable> connection) {
* Disposable as subscribe() may never return on its own.
*
* Note however, that asynchronously disconnecting a running source might leave
* child subscribers without any terminal event; PublishSubject does not have this
* child subscribers without any terminal event; PublishProcessor does not have this
* issue because the cancellation was always triggered by the child subscribers
* themselves.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,26 @@ public void subscribe(Subscriber<? super T> child) {

// create the backpressure-managing producer for this child
InnerSubscription<T> inner = new InnerSubscription<T>(r, child);
// the producer has been registered with the current subscriber-to-source so
// at least it will receive the next terminal event
// setting the producer will trigger the first request to be considered by
// the subscriber-to-source.
child.onSubscribe(inner);
// we try to add it to the array of subscribers
// if it fails, no worries because we will still have its buffer
// so it is going to replay it for us
r.add(inner);

if (inner.isDisposed()) {
r.remove(inner);
return;
}

r.manageRequests();

// trigger the capturing of the current node and total requested
r.buffer.replay(inner);
// the producer has been registered with the current subscriber-to-source so
// at least it will receive the next terminal event
// setting the producer will trigger the first request to be considered by
// the subscriber-to-source.
child.onSubscribe(inner);

break; // NOPMD
}
}
Expand Down Expand Up @@ -315,6 +324,9 @@ public void connect(Consumer<? super Disposable> connection) {
try {
connection.accept(ps);
} catch (Throwable ex) {
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableTakeUntilPredicate<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super T> predicate;
Expand Down Expand Up @@ -75,6 +76,8 @@ public void onError(Throwable t) {
if (!done) {
done = true;
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ public void onNext(Object t) {

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
parent.onError(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,14 @@ public void onComplete() {
U b;
synchronized (this) {
b = buffer;
if (b == null) {
return;
}
buffer = null;
}
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, actual, false, this, this);
if (b != null) {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, actual, false, this, this);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,6 @@ void removeInner(InnerObserver<T, U> inner) {
}
}

SimpleQueue<U> getMainQueue() {
SimpleQueue<U> q = queue;
if (q == null) {
if (maxConcurrency == Integer.MAX_VALUE) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
} else {
q = new SpscArrayQueue<U>(maxConcurrency);
}
queue = q;
}
return q;
}

void tryEmitScalar(Callable<? extends U> value) {
U u;
try {
Expand All @@ -248,7 +235,16 @@ void tryEmitScalar(Callable<? extends U> value) {
return;
}
} else {
SimpleQueue<U> q = getMainQueue();
SimpleQueue<U> q = queue;
if (q == null) {
if (maxConcurrency == Integer.MAX_VALUE) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
} else {
q = new SpscArrayQueue<U>(maxConcurrency);
}
queue = q;
}

if (!q.offer(u)) {
onError(new IllegalStateException("Scalar queue full?!"));
return;
Expand All @@ -260,15 +256,6 @@ void tryEmitScalar(Callable<? extends U> value) {
drainLoop();
}

SimpleQueue<U> getInnerQueue(InnerObserver<T, U> inner) {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
return q;
}

void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,12 @@ public void onError(Throwable e) {
@Override
public void onComplete() {
if (decrementAndGet() == 0) {
if (delayErrors) {
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
return;
}
Throwable ex = errors.terminate();
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
actual.onComplete();
}
}

Expand Down
Loading