Skip to content

Commit

Permalink
Signal NPE ObservableAmb FlowableAmb (#4645)
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorAlbertos authored and akarnokd committed Sep 30, 2016
1 parent d08266b commit ce21ecf
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.*;

import io.reactivex.exceptions.Exceptions;
import org.reactivestreams.*;

import io.reactivex.Flowable;
Expand All @@ -38,13 +39,23 @@ public void subscribeActual(Subscriber<? super T> s) {
int count = 0;
if (sources == null) {
sources = new Publisher[8];
for (Publisher<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Publisher<? extends T>[] b = new Publisher[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
try {
for (Publisher<? extends T> p : sourcesIterable) {
if (p == null) {
EmptySubscription.error(new NullPointerException("One of the sources is null"), s);
return;
}
if (count == sources.length) {
Publisher<? extends T>[] b = new Publisher[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
sources[count++] = p;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
} else {
count = sources.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;

Expand All @@ -36,13 +37,23 @@ public void subscribeActual(Observer<? super T> s) {
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), s);
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
sources[count++] = p;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
} else {
count = sources.length;
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/io/reactivex/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,22 @@ public void ambIterableNull() {
Flowable.amb((Iterable<Publisher<Object>>)null);
}

@Test(expected = NullPointerException.class)
@Test
public void ambIterableIteratorNull() {
Flowable.amb(new Iterable<Publisher<Object>>() {
@Override
public Iterator<Publisher<Object>> iterator() {
return null;
}
}).blockingLast();
}).test().assertError(NullPointerException.class);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
@Test
public void ambIterableOneIsNull() {
Flowable.amb(Arrays.asList(Flowable.never(), null)).blockingLast();
Flowable.amb(Arrays.asList(Flowable.never(), null))
.test()
.assertError(NullPointerException.class);
}

@Test(expected = NullPointerException.class)
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/io/reactivex/observable/ObservableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,22 @@ public void ambIterableNull() {
Observable.amb((Iterable<Observable<Object>>)null);
}

@Test(expected = NullPointerException.class)
@Test
public void ambIterableIteratorNull() {
Observable.amb(new Iterable<Observable<Object>>() {
@Override
public Iterator<Observable<Object>> iterator() {
return null;
}
}).blockingLast();
}).test().assertError(NullPointerException.class);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
@Test
public void ambIterableOneIsNull() {
Observable.amb(Arrays.asList(Observable.never(), null)).blockingLast();
Observable.amb(Arrays.asList(Observable.never(), null))
.test()
.assertError(NullPointerException.class);
}

@Test(expected = NullPointerException.class)
Expand Down

0 comments on commit ce21ecf

Please sign in to comment.