Skip to content

Commit

Permalink
2.x: Merge AmbArray and AmbIterable into Amb for Single, Maybe and Co…
Browse files Browse the repository at this point in the history
…mpletable types (#4647)
  • Loading branch information
VictorAlbertos authored and akarnokd committed Sep 30, 2016
1 parent ce21ecf commit 9047a3c
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 403 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static Completable ambArray(final CompletableSource... sources) {
return wrap(sources[0]);
}

return RxJavaPlugins.onAssembly(new CompletableAmbArray(sources));
return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null));
}

/**
Expand All @@ -79,7 +79,7 @@ public static Completable ambArray(final CompletableSource... sources) {
public static Completable amb(final Iterable<? extends CompletableSource> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");

return RxJavaPlugins.onAssembly(new CompletableAmbIterable(sources));
return RxJavaPlugins.onAssembly(new CompletableAmb(null, sources));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class Maybe<T> implements MaybeSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new MaybeAmbIterable<T>(sources));
return RxJavaPlugins.onAssembly(new MaybeAmb<T>(null, sources));
}

/**
Expand All @@ -80,7 +80,7 @@ public static <T> Maybe<T> ambArray(final MaybeSource<? extends T>... sources) {
if (sources.length == 1) {
return wrap((MaybeSource<T>)sources[0]);
}
return RxJavaPlugins.onAssembly(new MaybeAmbArray<T>(sources));
return RxJavaPlugins.onAssembly(new MaybeAmb<T>(sources, null));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class Single<T> implements SingleSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new SingleAmbIterable<T>(sources));
return RxJavaPlugins.onAssembly(new SingleAmb<T>(null, sources));
}

/**
Expand All @@ -97,7 +97,7 @@ public static <T> Single<T> ambArray(final SingleSource<? extends T>... sources)
if (sources.length == 1) {
return wrap((SingleSource<T>)sources[0]);
}
return RxJavaPlugins.onAssembly(new SingleAmbArray<T>(sources));
return RxJavaPlugins.onAssembly(new SingleAmb<T>(sources, null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,47 @@

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableAmbArray extends Completable {
public final class CompletableAmb extends Completable {
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;

final CompletableSource[] sources;

public CompletableAmbArray(CompletableSource[] sources) {
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}

@Override
public void subscribeActual(final CompletableObserver s) {
CompletableSource[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new CompletableSource[8];
try {
for (CompletableSource element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), s);
return;
}
if (count == sources.length) {
CompletableSource[] b = new CompletableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
};
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
} else {
count = sources.length;
}

final CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);

Expand Down Expand Up @@ -60,7 +89,8 @@ public void onSubscribe(Disposable d) {

};

for (CompletableSource c : sources) {
for (int i = 0; i < count; i++) {
CompletableSource c = sources[i];
if (set.isDisposed()) {
return;
}
Expand All @@ -81,5 +111,9 @@ public void onSubscribe(Disposable d) {
// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
}

if (count == 0) {
s.onComplete();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,58 @@

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Signals the event of the first MaybeSource that signals.
*
* @param <T> the value type emitted
*/
public final class MaybeAmbArray<T> extends Maybe<T> {
public final class MaybeAmb<T> extends Maybe<T> {
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;

final MaybeSource<? extends T>[] sources;

public MaybeAmbArray(MaybeSource<? extends T>[] sources) {
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}

@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(MaybeObserver<? super T> observer) {
MaybeSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new MaybeSource[8];
try {
for (MaybeSource<? extends T> element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
MaybeSource<? extends T>[] b = new MaybeSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
count = sources.length;
}

AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
observer.onSubscribe(parent);

for (MaybeSource<? extends T> s : sources) {
for (int i = 0; i < count; i++) {
MaybeSource<? extends T> s = sources[i];
if (parent.isDisposed()) {
return;
}
Expand All @@ -50,6 +80,11 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {

s.subscribe(parent);
}

if (count == 0) {
observer.onComplete();
}

}

static final class AmbMaybeObserver<T>
Expand Down
Loading

0 comments on commit 9047a3c

Please sign in to comment.