Skip to content

Commit

Permalink
2.x: fix withLatestFrom null checks, lifecycle (#4970)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 8, 2017
1 parent bcdfb13 commit 307a6a6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.subscribers.SerializedSubscriber;

public final class FlowableWithLatestFrom<T, U, R> extends AbstractFlowableWithUpstream<T, R> {
Expand All @@ -37,6 +37,8 @@ protected void subscribeActual(Subscriber<? super R> s) {
final SerializedSubscriber<R> serial = new SerializedSubscriber<R>(s);
final WithLatestFromSubscriber<T, U, R> wlf = new WithLatestFromSubscriber<T, U, R>(serial, combiner);

serial.onSubscribe(wlf);

other.subscribe(new Subscriber<U>() {
@Override
public void onSubscribe(Subscription s) {
Expand Down Expand Up @@ -73,6 +75,8 @@ static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U>

final AtomicReference<Subscription> s = new AtomicReference<Subscription>();

final AtomicLong requested = new AtomicLong();

final AtomicReference<Subscription> other = new AtomicReference<Subscription>();

WithLatestFromSubscriber(Subscriber<? super R> actual, BiFunction<? super T, ? super U, ? extends R> combiner) {
Expand All @@ -81,9 +85,7 @@ static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U>
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
actual.onSubscribe(this);
}
SubscriptionHelper.deferredSetOnce(this.s, requested, s);
}

@Override
Expand All @@ -92,7 +94,7 @@ public void onNext(T t) {
if (u != null) {
R r;
try {
r = combiner.apply(t, u);
r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
cancel();
Expand All @@ -117,12 +119,12 @@ public void onComplete() {

@Override
public void request(long n) {
s.get().request(n);
SubscriptionHelper.deferredRequest(s, requested, n);
}

@Override
public void cancel() {
s.get().cancel();
SubscriptionHelper.cancel(s);
SubscriptionHelper.cancel(other);
}

Expand All @@ -131,16 +133,8 @@ public boolean setOther(Subscription o) {
}

public void otherError(Throwable e) {
if (s.compareAndSet(null, SubscriptionHelper.CANCELLED)) {
EmptySubscription.error(e, actual);
} else {
if (s.get() != SubscriptionHelper.CANCELLED) {
cancel();
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
SubscriptionHelper.cancel(s);
actual.onError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.observers.SerializedObserver;

public final class ObservableWithLatestFrom<T, U, R> extends AbstractObservableWithUpstream<T, R> {
Expand All @@ -37,7 +38,7 @@ public void subscribeActual(Observer<? super R> t) {
final SerializedObserver<R> serial = new SerializedObserver<R>(t);
final WithLatestFromObserver<T, U, R> wlf = new WithLatestFromObserver<T, U, R>(serial, combiner);

t.onSubscribe(wlf);
serial.onSubscribe(wlf);

other.subscribe(new Observer<U>() {
@Override
Expand Down Expand Up @@ -91,7 +92,7 @@ public void onNext(T t) {
if (u != null) {
R r;
try {
r = combiner.apply(t, u);
r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,4 +681,31 @@ public Integer apply(Integer a, Integer b) throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void combineToNull1() {
Flowable.just(1)
.withLatestFrom(Flowable.just(2), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}

@SuppressWarnings("unchecked")
@Test
public void combineToNull2() {
Flowable.just(1)
.withLatestFrom(Arrays.asList(Flowable.just(2), Flowable.just(3)), new Function<Object[], Object>() {
@Override
public Object apply(Object[] o) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.junit.*;
import org.mockito.InOrder;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.TestHelper;
import io.reactivex.disposables.Disposables;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
Expand Down Expand Up @@ -620,4 +620,31 @@ public Object apply(Integer a, Integer b, Integer c) throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void combineToNull1() {
Observable.just(1)
.withLatestFrom(Observable.just(2), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}

@SuppressWarnings("unchecked")
@Test
public void combineToNull2() {
Observable.just(1)
.withLatestFrom(Arrays.asList(Observable.just(2), Observable.just(3)), new Function<Object[], Object>() {
@Override
public Object apply(Object[] o) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}
}

0 comments on commit 307a6a6

Please sign in to comment.