Skip to content

2.x: test sync from Observable to Flowable 10/16-1 #4714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/CompletableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface CompletableOperator {
* Applies a function to the child CompletableObserver and returns a new parent CompletableObserver.
* @param observer the child CompletableObservable instance
* @return the parent CompletableObserver instance
* @throws Exception on failure
*/
CompletableObserver apply(CompletableObserver observer) throws Exception;
}
46 changes: 23 additions & 23 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3500,7 +3500,7 @@ public static Flowable<Long> rangeLong(long start, long count) {
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3523,12 +3523,12 @@ public static Flowable<Long> rangeLong(long start, long count) {
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) {
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize());
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3549,19 +3549,19 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* a function used to compare items emitted by each Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two Publisher sequences
* @return a Single that emits a Boolean value that indicates whether the two Publisher sequences
* are the same according to the specified function
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
return sequenceEqual(source1, source2, isEqual, bufferSize());
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3584,23 +3584,23 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* the number of items to prefetch from the first and second source Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two Publisher sequences
* @return a Single that emits a Boolean value that indicates whether the two Publisher sequences
* are the same according to the specified function
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual, int bufferSize) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(isEqual, "isEqual is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableSequenceEqual<T>(source1, source2, isEqual, bufferSize));
return RxJavaPlugins.onAssembly(new FlowableSequenceEqualSingle<T>(source1, source2, isEqual, bufferSize));
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3620,12 +3620,12 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* the number of items to prefetch from the first and second source Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two sequences are the same
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, int bufferSize) {
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, int bufferSize) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize);
}

Expand Down Expand Up @@ -5196,7 +5196,7 @@ public final T blockingLast(T defaultItem) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingLatest() {
return BlockingFlowableLatest.latest(this);
return new BlockingFlowableLatest<T>(this);
}

/**
Expand All @@ -5222,7 +5222,7 @@ public final Iterable<T> blockingLatest() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingMostRecent(T initialItem) {
return BlockingFlowableMostRecent.mostRecent(this, initialItem);
return new BlockingFlowableMostRecent<T>(this, initialItem);
}

/**
Expand All @@ -5245,7 +5245,7 @@ public final Iterable<T> blockingMostRecent(T initialItem) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingNext() {
return BlockingFlowableNext.next(this);
return new BlockingFlowableNext<T>(this);
}

/**
Expand All @@ -5267,7 +5267,7 @@ public final Iterable<T> blockingNext() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
return singleElement().blockingGet();
return singleOrError().blockingGet();
}

/**
Expand Down Expand Up @@ -7248,7 +7248,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
Callable<? extends Collection<? super K>> collectionSupplier) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return FlowableDistinct.withCollection(this, keySelector, collectionSupplier);
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(this, keySelector, collectionSupplier));
}

/**
Expand All @@ -7271,7 +7271,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> distinctUntilChanged() {
return FlowableDistinct.<T>untilChanged(this);
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
}

/**
Expand Down Expand Up @@ -7299,7 +7299,7 @@ public final Flowable<T> distinctUntilChanged() {
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
return FlowableDistinct.untilChanged(this, keySelector);
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
}

/**
Expand Down Expand Up @@ -13734,7 +13734,7 @@ public final Single<List<T>> toList(final int capacityHint) {
}

/**
* Returns a Flowable that emits a single item, a list composed of all the items emitted by the source
* Returns a Single that emits a single item, a list composed of all the items emitted by the source
* Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toList.png" alt="">
Expand All @@ -13758,15 +13758,15 @@ public final Single<List<T>> toList(final int capacityHint) {
* @param <U> the subclass of a collection of Ts
* @param collectionSupplier
* the Callable returning the collection (for each individual Subscriber) to be filled in
* @return a Flowable that emits a single item: a List containing all of the items emitted by the source
* @return a Single that emits a single item: a List containing all of the items emitted by the source
* Publisher
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U extends Collection<? super T>> Flowable<U> toList(Callable<U> collectionSupplier) {
public final <U extends Collection<? super T>> Single<U> toList(Callable<U> collectionSupplier) {
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return RxJavaPlugins.onAssembly(new FlowableToList<T, U>(this, collectionSupplier));
return RxJavaPlugins.onAssembly(new FlowableToListSingle<T, U>(this, collectionSupplier));
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/FlowableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface FlowableOperator<Downstream, Upstream> {
* Applies a function to the child Subscriber and returns a new parent Subscriber.
* @param observer the child Subscriber instance
* @return the parent Subscriber instance
* @throws Exception on failure
*/
Subscriber<? super Upstream> apply(Subscriber<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/MaybeOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface MaybeOperator<Downstream, Upstream> {
* Applies a function to the child MaybeObserver and returns a new parent MaybeObserver.
* @param observer the child MaybeObserver instance
* @return the parent MaybeObserver instance
* @throws Exception on failure
*/
MaybeObserver<? super Upstream> apply(MaybeObserver<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/ObservableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface ObservableOperator<Downstream, Upstream> {
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
Observer<? super Upstream> apply(Observer<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/SingleOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface SingleOperator<Downstream, Upstream> {
* Applies a function to the child SingleObserver and returns a new parent SingleObserver.
* @param observer the child SingleObserver instance
* @return the parent SingleObserver instance
* @throws Exception on failure
*/
SingleObserver<? super Upstream> apply(SingleObserver<? super Downstream> observer) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,66 +51,5 @@
* </ul>
* @param <T> the value type transmitted through the queue
*/
public interface QueueDisposable<T> extends SimpleQueue<T>, Disposable {
/**
* Returned by the {@link #requestFusion(int)} if the upstream doesn't support
* the requested mode.
*/
int NONE = 0;

/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/
int SYNC = 1;

/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/
int ASYNC = 2;

/**
* Request any of the {@link #SYNC} or {@link #ASYNC} modes.
*/
int ANY = SYNC | ASYNC;

/**
* Used in binary or combination with the other constants as an input to {@link #requestFusion(int)}
* indicating that the {@link #poll()} will be called behind an asynchronous boundary and thus
* may change the non-trivial computation locations attached to the {@link #poll()} chain of
* fused operators.
* <p>
* For example, fusing map() and observeOn() may move the computation of the map's function over to
* the thread run after the observeOn(), which is generally unexpected.
*/
int BOUNDARY = 4;

/**
* Request a fusion mode from the upstream.
* <p>
* This should be called before {@code onSubscribe} returns.
* <p>
* Calling this method multiple times or after {@code onSubscribe} finished is not allowed
* and may result in undefined behavior.
* <p>
* @param mode the requested fusion mode, allowed values are {@link #SYNC}, {@link #ASYNC},
* {@link #ANY} combined with {@link #BOUNDARY} (e.g., {@code requestFusion(SYNC | BOUNDARY)}).
* @return the established fusion mode: {@link #NONE}, {@link #SYNC}, {@link #ASYNC}.
*/
int requestFusion(int mode);
public interface QueueDisposable<T> extends QueueFuseable<T>, Disposable {
}
84 changes: 84 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/QueueFuseable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.fuseable;


/**
* Represents a SimpleQueue plus the means and constants for requesting a fusion mode.
* @param <T> the value type returned by the SimpleQueue.poll()
*/
public interface QueueFuseable<T> extends SimpleQueue<T> {
/**
* Returned by the {@link #requestFusion(int)} if the upstream doesn't support
* the requested mode.
*/
int NONE = 0;

/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/
int SYNC = 1;

/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/
int ASYNC = 2;

/**
* Request any of the {@link #SYNC} or {@link #ASYNC} modes.
*/
int ANY = SYNC | ASYNC;

/**
* Used in binary or combination with the other constants as an input to {@link #requestFusion(int)}
* indicating that the {@link #poll()} will be called behind an asynchronous boundary and thus
* may change the non-trivial computation locations attached to the {@link #poll()} chain of
* fused operators.
* <p>
* For example, fusing map() and observeOn() may move the computation of the map's function over to
* the thread run after the observeOn(), which is generally unexpected.
*/
int BOUNDARY = 4;

/**
* Request a fusion mode from the upstream.
* <p>
* This should be called before {@code onSubscribe} returns.
* <p>
* Calling this method multiple times or after {@code onSubscribe} finished is not allowed
* and may result in undefined behavior.
* <p>
* @param mode the requested fusion mode, allowed values are {@link #SYNC}, {@link #ASYNC},
* {@link #ANY} combined with {@link #BOUNDARY} (e.g., {@code requestFusion(SYNC | BOUNDARY)}).
* @return the established fusion mode: {@link #NONE}, {@link #SYNC}, {@link #ASYNC}.
*/
int requestFusion(int mode);

}
Loading