Skip to content

Commit

Permalink
2.x: test sync from Observable to Flowable 10/16-1 (#4714)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 17, 2016
1 parent 788873e commit e255de7
Show file tree
Hide file tree
Showing 80 changed files with 5,655 additions and 633 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/CompletableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface CompletableOperator {
* Applies a function to the child CompletableObserver and returns a new parent CompletableObserver.
* @param observer the child CompletableObservable instance
* @return the parent CompletableObserver instance
* @throws Exception on failure
*/
CompletableObserver apply(CompletableObserver observer) throws Exception;
}
46 changes: 23 additions & 23 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3500,7 +3500,7 @@ public static Flowable<Long> rangeLong(long start, long count) {
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3523,12 +3523,12 @@ public static Flowable<Long> rangeLong(long start, long count) {
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) {
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize());
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3549,19 +3549,19 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* a function used to compare items emitted by each Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two Publisher sequences
* @return a Single that emits a Boolean value that indicates whether the two Publisher sequences
* are the same according to the specified function
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
return sequenceEqual(source1, source2, isEqual, bufferSize());
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3584,23 +3584,23 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* the number of items to prefetch from the first and second source Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two Publisher sequences
* @return a Single that emits a Boolean value that indicates whether the two Publisher sequences
* are the same according to the specified function
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual, int bufferSize) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(isEqual, "isEqual is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableSequenceEqual<T>(source1, source2, isEqual, bufferSize));
return RxJavaPlugins.onAssembly(new FlowableSequenceEqualSingle<T>(source1, source2, isEqual, bufferSize));
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* Returns a Single that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3620,12 +3620,12 @@ public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1
* the number of items to prefetch from the first and second source Publisher
* @param <T>
* the type of items emitted by each Publisher
* @return a Flowable that emits a Boolean value that indicates whether the two sequences are the same
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, int bufferSize) {
public static <T> Single<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, int bufferSize) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize);
}

Expand Down Expand Up @@ -5196,7 +5196,7 @@ public final T blockingLast(T defaultItem) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingLatest() {
return BlockingFlowableLatest.latest(this);
return new BlockingFlowableLatest<T>(this);
}

/**
Expand All @@ -5222,7 +5222,7 @@ public final Iterable<T> blockingLatest() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingMostRecent(T initialItem) {
return BlockingFlowableMostRecent.mostRecent(this, initialItem);
return new BlockingFlowableMostRecent<T>(this, initialItem);
}

/**
Expand All @@ -5245,7 +5245,7 @@ public final Iterable<T> blockingMostRecent(T initialItem) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingNext() {
return BlockingFlowableNext.next(this);
return new BlockingFlowableNext<T>(this);
}

/**
Expand All @@ -5267,7 +5267,7 @@ public final Iterable<T> blockingNext() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
return singleElement().blockingGet();
return singleOrError().blockingGet();
}

/**
Expand Down Expand Up @@ -7248,7 +7248,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
Callable<? extends Collection<? super K>> collectionSupplier) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return FlowableDistinct.withCollection(this, keySelector, collectionSupplier);
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(this, keySelector, collectionSupplier));
}

/**
Expand All @@ -7271,7 +7271,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> distinctUntilChanged() {
return FlowableDistinct.<T>untilChanged(this);
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
}

/**
Expand Down Expand Up @@ -7299,7 +7299,7 @@ public final Flowable<T> distinctUntilChanged() {
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
return FlowableDistinct.untilChanged(this, keySelector);
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
}

/**
Expand Down Expand Up @@ -13734,7 +13734,7 @@ public final Single<List<T>> toList(final int capacityHint) {
}

/**
* Returns a Flowable that emits a single item, a list composed of all the items emitted by the source
* Returns a Single that emits a single item, a list composed of all the items emitted by the source
* Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toList.png" alt="">
Expand All @@ -13758,15 +13758,15 @@ public final Single<List<T>> toList(final int capacityHint) {
* @param <U> the subclass of a collection of Ts
* @param collectionSupplier
* the Callable returning the collection (for each individual Subscriber) to be filled in
* @return a Flowable that emits a single item: a List containing all of the items emitted by the source
* @return a Single that emits a single item: a List containing all of the items emitted by the source
* Publisher
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U extends Collection<? super T>> Flowable<U> toList(Callable<U> collectionSupplier) {
public final <U extends Collection<? super T>> Single<U> toList(Callable<U> collectionSupplier) {
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return RxJavaPlugins.onAssembly(new FlowableToList<T, U>(this, collectionSupplier));
return RxJavaPlugins.onAssembly(new FlowableToListSingle<T, U>(this, collectionSupplier));
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/FlowableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface FlowableOperator<Downstream, Upstream> {
* Applies a function to the child Subscriber and returns a new parent Subscriber.
* @param observer the child Subscriber instance
* @return the parent Subscriber instance
* @throws Exception on failure
*/
Subscriber<? super Upstream> apply(Subscriber<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/MaybeOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface MaybeOperator<Downstream, Upstream> {
* Applies a function to the child MaybeObserver and returns a new parent MaybeObserver.
* @param observer the child MaybeObserver instance
* @return the parent MaybeObserver instance
* @throws Exception on failure
*/
MaybeObserver<? super Upstream> apply(MaybeObserver<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/ObservableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface ObservableOperator<Downstream, Upstream> {
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
Observer<? super Upstream> apply(Observer<? super Downstream> observer) throws Exception;
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/SingleOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface SingleOperator<Downstream, Upstream> {
* Applies a function to the child SingleObserver and returns a new parent SingleObserver.
* @param observer the child SingleObserver instance
* @return the parent SingleObserver instance
* @throws Exception on failure
*/
SingleObserver<? super Upstream> apply(SingleObserver<? super Downstream> observer) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,66 +51,5 @@
* </ul>
* @param <T> the value type transmitted through the queue
*/
public interface QueueDisposable<T> extends SimpleQueue<T>, Disposable {
/**
* Returned by the {@link #requestFusion(int)} if the upstream doesn't support
* the requested mode.
*/
int NONE = 0;

/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/
int SYNC = 1;

/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/
int ASYNC = 2;

/**
* Request any of the {@link #SYNC} or {@link #ASYNC} modes.
*/
int ANY = SYNC | ASYNC;

/**
* Used in binary or combination with the other constants as an input to {@link #requestFusion(int)}
* indicating that the {@link #poll()} will be called behind an asynchronous boundary and thus
* may change the non-trivial computation locations attached to the {@link #poll()} chain of
* fused operators.
* <p>
* For example, fusing map() and observeOn() may move the computation of the map's function over to
* the thread run after the observeOn(), which is generally unexpected.
*/
int BOUNDARY = 4;

/**
* Request a fusion mode from the upstream.
* <p>
* This should be called before {@code onSubscribe} returns.
* <p>
* Calling this method multiple times or after {@code onSubscribe} finished is not allowed
* and may result in undefined behavior.
* <p>
* @param mode the requested fusion mode, allowed values are {@link #SYNC}, {@link #ASYNC},
* {@link #ANY} combined with {@link #BOUNDARY} (e.g., {@code requestFusion(SYNC | BOUNDARY)}).
* @return the established fusion mode: {@link #NONE}, {@link #SYNC}, {@link #ASYNC}.
*/
int requestFusion(int mode);
public interface QueueDisposable<T> extends QueueFuseable<T>, Disposable {
}
84 changes: 84 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/QueueFuseable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.fuseable;


/**
* Represents a SimpleQueue plus the means and constants for requesting a fusion mode.
* @param <T> the value type returned by the SimpleQueue.poll()
*/
public interface QueueFuseable<T> extends SimpleQueue<T> {
/**
* Returned by the {@link #requestFusion(int)} if the upstream doesn't support
* the requested mode.
*/
int NONE = 0;

/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/
int SYNC = 1;

/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/
int ASYNC = 2;

/**
* Request any of the {@link #SYNC} or {@link #ASYNC} modes.
*/
int ANY = SYNC | ASYNC;

/**
* Used in binary or combination with the other constants as an input to {@link #requestFusion(int)}
* indicating that the {@link #poll()} will be called behind an asynchronous boundary and thus
* may change the non-trivial computation locations attached to the {@link #poll()} chain of
* fused operators.
* <p>
* For example, fusing map() and observeOn() may move the computation of the map's function over to
* the thread run after the observeOn(), which is generally unexpected.
*/
int BOUNDARY = 4;

/**
* Request a fusion mode from the upstream.
* <p>
* This should be called before {@code onSubscribe} returns.
* <p>
* Calling this method multiple times or after {@code onSubscribe} finished is not allowed
* and may result in undefined behavior.
* <p>
* @param mode the requested fusion mode, allowed values are {@link #SYNC}, {@link #ASYNC},
* {@link #ANY} combined with {@link #BOUNDARY} (e.g., {@code requestFusion(SYNC | BOUNDARY)}).
* @return the established fusion mode: {@link #NONE}, {@link #SYNC}, {@link #ASYNC}.
*/
int requestFusion(int mode);

}
Loading

0 comments on commit e255de7

Please sign in to comment.