Skip to content

Commit

Permalink
2.x: cleanup, fixes and coverage 10/25 (#4766)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 25, 2016
1 parent 318bf43 commit 83ba4b9
Show file tree
Hide file tree
Showing 36 changed files with 1,339 additions and 434 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11329,6 +11329,8 @@ public final <R> Flowable<R> scan(final R initialValue, BiFunction<R, ? super T,
* Publisher.defer(() -> o.scan(new ArrayList&lt;>(), (list, item) -> list.add(item)))
* );
* </code></pre>
* <p>
* Unlike 1.x, this operator doesn't emit the seed value unless the upstream signals an event.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void onError(Throwable t) {
onError.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
RxJavaPlugins.onError(new CompositeException(t, ex));
}
}

Expand Down
48 changes: 26 additions & 22 deletions src/main/java/io/reactivex/internal/observers/LambdaObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
Expand Down Expand Up @@ -47,42 +47,46 @@ public void onSubscribe(Disposable s) {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
RxJavaPlugins.onError(ex);
onError(ex);
}
}
}

@Override
public void onNext(T t) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
}
}
}

@Override
public void onError(Throwable t) {
dispose();
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
RxJavaPlugins.onError(t);
if (!isDisposed()) {
dispose();
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
}

@Override
public void onComplete() {
dispose();
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
if (!isDisposed()) {
dispose();
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static final class ConcatMapEagerDelayErrorSubscriber<T, R>

final ErrorMode errorMode;

final AtomicReference<Throwable> error;
final AtomicThrowable errors;

final AtomicLong requested;

Expand All @@ -95,7 +95,7 @@ static final class ConcatMapEagerDelayErrorSubscriber<T, R>
this.prefetch = prefetch;
this.errorMode = errorMode;
this.subscribers = new SpscLinkedArrayQueue<InnerQueuedSubscriber<R>>(Math.min(prefetch, maxConcurrency));
this.error = new AtomicReference<Throwable>();
this.errors = new AtomicThrowable();
this.requested = new AtomicLong();
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (ExceptionHelper.addThrowable(error, t)) {
if (errors.addThrowable(t)) {
done = true;
drain();
} else {
Expand Down Expand Up @@ -207,7 +207,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {

@Override
public void innerError(InnerQueuedSubscriber<R> inner, Throwable e) {
if (ExceptionHelper.addThrowable(this.error, e)) {
if (errors.addThrowable(e)) {
inner.setDone();
if (errorMode != ErrorMode.END) {
s.cancel();
Expand Down Expand Up @@ -242,11 +242,11 @@ public void drain() {
if (inner == null) {

if (em != ErrorMode.END) {
Throwable ex = error.get();
Throwable ex = errors.get();
if (ex != null) {
cancelAll();

a.onError(ex);
a.onError(errors.terminate());
return;
}
}
Expand All @@ -256,7 +256,7 @@ public void drain() {
inner = subscribers.poll();

if (outerDone && inner == null) {
Throwable ex = error.get();
Throwable ex = errors.terminate();
if (ex != null) {
a.onError(ex);
} else {
Expand All @@ -282,13 +282,13 @@ public void drain() {
}

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
Throwable ex = errors.get();
if (ex != null) {
current = null;
inner.cancel();
cancelAll();

a.onError(ex);
a.onError(errors.terminate());
return;
}
}
Expand Down Expand Up @@ -336,13 +336,13 @@ public void drain() {
}

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
Throwable ex = errors.get();
if (ex != null) {
current = null;
inner.cancel();
cancelAll();

a.onError(ex);
a.onError(errors.terminate());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.Cancellable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -91,7 +91,7 @@ static final class SerializedEmitter<T>

final AtomicThrowable error;

final SimpleQueue<T> queue;
final SimplePlainQueue<T> queue;

volatile boolean done;

Expand All @@ -116,7 +116,7 @@ public void onNext(T t) {
return;
}
} else {
SimpleQueue<T> q = queue;
SimplePlainQueue<T> q = queue;
synchronized (q) {
q.offer(t);
}
Expand Down Expand Up @@ -161,7 +161,7 @@ void drain() {

void drainLoop() {
BaseEmitter<T> e = emitter;
SimpleQueue<T> q = queue;
SimplePlainQueue<T> q = queue;
AtomicThrowable error = this.error;
int missed = 1;
for (;;) {
Expand All @@ -179,15 +179,8 @@ void drainLoop() {
}

boolean d = done;
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// should never happen
v = null;
}

T v = q.poll();

boolean empty = v == null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ void emit(long idx, T value) {
long r = get();
if (r != 0L) {
actual.onNext(value);
if (r != Long.MAX_VALUE) {
decrementAndGet();
}
BackpressureHelper.produced(this, 1);
} else {
cancel();
actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong

Subscription s;

final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
final SequentialDisposable timer = new SequentialDisposable();

volatile long index;

Expand Down Expand Up @@ -94,13 +94,11 @@ public void onNext(T t) {
}

DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
if (!timer.compareAndSet(d, de)) {
return;
}
if (timer.replace(de)) {
d = worker.schedule(de, timeout, unit);

d = worker.schedule(de, timeout, unit);

de.setResource(d);
de.setResource(d);
}
}

@Override
Expand Down Expand Up @@ -153,9 +151,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
long r = get();
if (r != 0L) {
actual.onNext(t);
if (r != Long.MAX_VALUE) {
decrementAndGet();
}
BackpressureHelper.produced(this, 1);

emitter.dispose();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

Expand All @@ -24,7 +24,6 @@
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.FuseToFlowable;
import io.reactivex.internal.observers.BasicIntQueueDisposable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.AtomicThrowable;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -62,8 +61,8 @@ public Flowable<T> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletable<T>(source, mapper, delayErrors, maxConcurrency));
}

static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueDisposable<T>
implements Subscriber<T> {
static final class FlatMapCompletableMainSubscriber<T> extends AtomicInteger
implements Subscriber<T>, Disposable {
private static final long serialVersionUID = 8443155186132538303L;

final CompletableObserver actual;
Expand Down Expand Up @@ -183,26 +182,6 @@ public boolean isDisposed() {
return set.isDisposed();
}

@Override
public T poll() throws Exception {
return null; // always empty
}

@Override
public boolean isEmpty() {
return true; // always empty
}

@Override
public void clear() {
// nothing to clear
}

@Override
public int requestFusion(int mode) {
return mode & ASYNC;
}

void innerComplete(InnerObserver inner) {
set.delete(inner);
onComplete();
Expand Down
Loading

0 comments on commit 83ba4b9

Please sign in to comment.