Skip to content

Commit

Permalink
2.x: coverage, fixes, cleanup 10/21-1 (#4742)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 21, 2016
1 parent 14a954c commit 98d0b7f
Show file tree
Hide file tree
Showing 20 changed files with 927 additions and 671 deletions.
9 changes: 9 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.*;
import java.util.concurrent.*;

import org.reactivestreams.Subscription;

import io.reactivex.*;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -662,4 +664,11 @@ public boolean test(T t1, T t2) throws Exception {
public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
return new KeyedEqualsPredicate<T, K>(keySelector);
}

public static final Consumer<Subscription> REQUEST_MAX = new Consumer<Subscription>() {
@Override
public void accept(Subscription t) throws Exception {
t.request(Long.MAX_VALUE);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subscribers.DefaultSubscriber;

/**
* Utility methods to consume a Publisher in a blocking manner with callbacks or Subscriber.
Expand Down Expand Up @@ -65,17 +62,14 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
if (bs.isCancelled()) {
break;
}
if (o == BlockingSubscriber.TERMINATED) {
break;
}
if (NotificationLite.acceptFull(v, subscriber)) {
if (o == BlockingSubscriber.TERMINATED
|| NotificationLite.acceptFull(v, subscriber)) {
break;
}
}
} catch (InterruptedException e) {
subscriber.onError(e);
} finally {
bs.cancel();
subscriber.onError(e);
}
}

Expand All @@ -85,31 +79,14 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o) {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] error = { null };
BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver();
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(Functions.emptyConsumer(),
new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
error[0] = e;
cdl.countDown();
}
}, new Action() {
@Override
public void run() {
cdl.countDown();
}
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
s.request(Long.MAX_VALUE);
}
});
callback, callback, Functions.REQUEST_MAX);

o.subscribe(ls);

BlockingHelper.awaitForComplete(cdl, ls);
Throwable e = error[0];
BlockingHelper.awaitForComplete(callback, ls);
Throwable e = callback.error;
if (e != null) {
throw ExceptionHelper.wrapOrThrow(e);
}
Expand All @@ -125,50 +102,6 @@ public void accept(Subscription s) {
*/
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
final Consumer<? super Throwable> onError, final Action onComplete) {
subscribe(o, new DefaultSubscriber<T>() {
boolean done;
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
onNext.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
}
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.onError(e);
return;
}
done = true;
try {
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
try {
onComplete.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
});
subscribe(o, new LambdaSubscriber<T>(onNext, onError, onComplete, Functions.REQUEST_MAX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ public void request(long n) {
void innerSuccess(InnerObserver inner, R value) {
set.delete(inner);
if (get() == 0 && compareAndSet(0, 1)) {
boolean d = active.decrementAndGet() == 0;
if (requested.get() != 0) {
actual.onNext(value);

boolean d = active.decrementAndGet() == 0;
SpscLinkedArrayQueue<R> q = queue.get();

if (d && (q == null || q.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ public void request(long n) {
void innerSuccess(InnerObserver inner, R value) {
set.delete(inner);
if (get() == 0 && compareAndSet(0, 1)) {
boolean d = active.decrementAndGet() == 0;
if (requested.get() != 0) {
actual.onNext(value);

boolean d = active.decrementAndGet() == 0;
SpscLinkedArrayQueue<R> q = queue.get();

if (d && (q == null || q.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Notification;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableMaterialize<T> extends AbstractFlowableWithUpstream<T, Notification<T>> {

Expand All @@ -32,24 +33,20 @@ protected void subscribeActual(Subscriber<? super Notification<T>> s) {
source.subscribe(new MaterializeSubscriber<T>(s));
}

// FIXME needs post-complete drain management
static final class MaterializeSubscriber<T> extends AtomicLong implements Subscriber<T>, Subscription {

private static final long serialVersionUID = -3740826063558713822L;

final Subscriber<? super Notification<T>> actual;

Subscription s;

final AtomicInteger state = new AtomicInteger();

Notification<T> value;

volatile boolean done;
long produced;

static final int NO_REQUEST_NO_VALUE = 0;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3;
static final long COMPLETE_MASK = Long.MIN_VALUE;
static final long REQUEST_MASK = Long.MAX_VALUE;

MaterializeSubscriber(Subscriber<? super Notification<T>> actual) {
this.actual = actual;
Expand All @@ -65,92 +62,70 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
produced++;
actual.onNext(Notification.createOnNext(t));

if (get() != Long.MAX_VALUE) {
decrementAndGet();
}
}

void tryEmit(Notification<T> v) {
if (get() != 0L) {
state.lazySet(HAS_REQUEST_HAS_VALUE);
actual.onNext(v);
actual.onComplete();
} else {
for (;;) {
int s = state.get();
if (s == HAS_REQUEST_NO_VALUE) {
if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) {
actual.onNext(v);
actual.onComplete();
return;
}
} else
if (s == NO_REQUEST_HAS_VALUE) {
return;
} else
if (s == HAS_REQUEST_HAS_VALUE) {
value = null;
return;
} else {
value = v;
done = true;
if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) {
return;
}
}
}
}
}

@Override
public void onError(Throwable t) {
Notification<T> v = Notification.createOnError(t);

tryEmit(v);
complete(Notification.<T>createOnError(t));
}

@Override
public void onComplete() {
Notification<T> v = Notification.createOnComplete();
complete(Notification.<T>createOnComplete());
}

tryEmit(v);
void complete(Notification<T> n) {
long p = produced;
if (p != 0) {
BackpressureHelper.produced(this, p);
}

for (;;) {
long r = get();
if ((r & COMPLETE_MASK) != 0) {
if (n.isOnError()) {
RxJavaPlugins.onError(n.getError());
}
return;
}
if ((r & REQUEST_MASK) != 0) {
lazySet(COMPLETE_MASK + 1);
actual.onNext(n);
actual.onComplete();
return;
}
value = n;
if (compareAndSet(0, COMPLETE_MASK)) {
return;
}
}
}

@Override
public void request(long n) {
if (!SubscriptionHelper.validate(n)) {
return;
}
BackpressureHelper.add(this, n);
if (done) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
int s = state.get();
if (s == NO_REQUEST_HAS_VALUE) {
if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) {
Notification<T> v = value;
value = null;
actual.onNext(v);
long r = get();
if ((r & COMPLETE_MASK) != 0) {
if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) {
actual.onNext(value);
actual.onComplete();
return;
}
} else
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
return;
} else
if (state.compareAndSet(s, HAS_REQUEST_NO_VALUE)) {
return;
break;
}
long u = BackpressureHelper.addCap(r, n);
if (compareAndSet(r, u)) {
s.request(n);
break;
}
}
} else {
s.request(n);
}
}

@Override
public void cancel() {
state.lazySet(HAS_REQUEST_HAS_VALUE);
s.cancel();
}
}
Expand Down
Loading

0 comments on commit 98d0b7f

Please sign in to comment.