Skip to content

1.x: add a source OnSubscribe which works from an array directly #3477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,14 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}

/**
Expand Down Expand Up @@ -1423,7 +1430,7 @@ public final static <T> Observable<T> just(final T value) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2) {
return from(Arrays.asList(t1, t2));
return from((T[])new Object[] { t1, t2 });
}

/**
Expand All @@ -1449,7 +1456,7 @@ public final static <T> Observable<T> just(T t1, T t2) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3) {
return from(Arrays.asList(t1, t2, t3));
return from((T[])new Object[] { t1, t2, t3 });
}

/**
Expand Down Expand Up @@ -1477,7 +1484,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
return from(Arrays.asList(t1, t2, t3, t4));
return from((T[])new Object[] { t1, t2, t3, t4 });
}

/**
Expand Down Expand Up @@ -1507,7 +1514,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
return from(Arrays.asList(t1, t2, t3, t4, t5));
return from((T[])new Object[] { t1, t2, t3, t4, t5 });
}

/**
Expand Down Expand Up @@ -1539,7 +1546,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6 });
}

/**
Expand Down Expand Up @@ -1573,7 +1580,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7 });
}

/**
Expand Down Expand Up @@ -1609,7 +1616,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8 });
}

/**
Expand Down Expand Up @@ -1647,7 +1654,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
}

/**
Expand Down Expand Up @@ -1687,7 +1694,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
// suppress unchecked because we are using varargs inside the method
@SuppressWarnings("unchecked")
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 });
}

/**
Expand Down Expand Up @@ -1821,7 +1828,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(from(Arrays.asList(t1, t2)));
return merge(new Observable[] { t1, t2 });
}

/**
Expand All @@ -1847,7 +1854,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
return merge(from(Arrays.asList(t1, t2, t3)));
return merge(new Observable[] { t1, t2, t3 });
}

/**
Expand Down Expand Up @@ -1875,7 +1882,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
return merge(from(Arrays.asList(t1, t2, t3, t4)));
return merge(new Observable[] { t1, t2, t3, t4 });
}

/**
Expand Down Expand Up @@ -1905,7 +1912,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5)));
return merge(new Observable[] { t1, t2, t3, t4, t5 });
}

/**
Expand Down Expand Up @@ -1937,7 +1944,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6 });
}

/**
Expand Down Expand Up @@ -1971,7 +1978,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7 });
}

/**
Expand Down Expand Up @@ -2007,7 +2014,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8 });
}

/**
Expand Down Expand Up @@ -2045,7 +2052,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
*/
@SuppressWarnings("unchecked")
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)));
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
}

/**
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeFromArray.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.OnSubscribe;

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
/** */
private static final long serialVersionUID = 3534218984725836979L;

final Subscriber<? super T> child;
final T[] array;

int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;

long e = 0L;
int i = index;

for (;;) {

while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(array[i]);

i++;

if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}

r--;
e--;
}

r = get() + e;

if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
}
Loading