Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Remove takeFirst(predicate) in Observable & Flowable #4595

Merged
merged 1 commit into from
Sep 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12341,32 +12341,6 @@ public final Flowable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
return takeUntil(timer(time, unit, scheduler));
}

/**
* Returns a Flowable that emits only the very first item emitted by the source Publisher that satisfies
* a specified condition.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeFirstN.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeFirst} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* the condition any item emitted by the source Publisher has to satisfy
* @return a Flowable that emits only the very first item emitted by the source Publisher that satisfies
* the given condition, or that completes without emitting anything if the source Publisher
* completes without emitting a single condition-satisfying item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> takeFirst(Predicate<? super T> predicate) {
return filter(predicate).take(1);
}

/**
* Returns a Flowable that emits at most the last {@code count} items emitted by the source Publisher. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10333,28 +10333,6 @@ public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
return takeUntil(timer(time, unit, scheduler));
}

/**
* Returns an Observable that emits only the very first item emitted by the source ObservableSource that satisfies
* a specified condition.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeFirstN.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeFirst} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* the condition any item emitted by the source ObservableSource has to satisfy
* @return an Observable that emits only the very first item emitted by the source ObservableSource that satisfies
* the given condition, or that completes without emitting anything if the source ObservableSource
* completes without emitting a single condition-satisfying item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> takeFirst(Predicate<? super T> predicate) {
return filter(predicate).take(1);
}

/**
* Returns an Observable that emits at most the last {@code count} items emitted by the source ObservableSource. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand Down
7 changes: 1 addition & 6 deletions src/test/java/io/reactivex/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2179,11 +2179,6 @@ public void takeTimedSchedulerNull() {
just1.take(1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void takeFirstNull() {
just1.takeFirst(null);
}

@Test(expected = NullPointerException.class)
public void takeLastTimedUnitNull() {
just1.takeLast(1, null, Schedulers.single());
Expand Down Expand Up @@ -2957,4 +2952,4 @@ public Object apply(Object[] v) {
}
}, 128, just1).blockingLast();
}
}
}
5 changes: 3 additions & 2 deletions src/test/java/io/reactivex/flowable/FlowableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ public Throwable call() {
verify(wo, times(1)).onError(any(RuntimeException.class));
}

@Test
public void testTakeFirstWithPredicateOfSome() {
Flowable<Integer> observable = Flowable.just(1, 3, 5, 4, 6, 3);
observable.takeFirst(IS_EVEN).subscribe(w);
observable.filter(IS_EVEN).take(1).subscribe(w);
verify(w, times(1)).onNext(anyInt());
verify(w).onNext(4);
verify(w, times(1)).onComplete();
Expand All @@ -194,7 +195,7 @@ public void testTakeFirstWithPredicateOfSome() {
@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
Flowable<Integer> observable = Flowable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
observable.takeFirst(IS_EVEN).subscribe(w);
observable.filter(IS_EVEN).take(1).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onComplete();
verify(w, never()).onError(any(Throwable.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2250,11 +2250,6 @@ public void takeTimedSchedulerNull() {
just1.take(1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void takeFirstNull() {
just1.takeFirst(null);
}

@Test(expected = NullPointerException.class)
public void takeLastTimedUnitNull() {
just1.takeLast(1, null, Schedulers.single());
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/reactivex/observable/ObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Throwable call() {
@Test
public void testTakeFirstWithPredicateOfSome() {
Observable<Integer> o = Observable.just(1, 3, 5, 4, 6, 3);
o.takeFirst(IS_EVEN).subscribe(w);
o.filter(IS_EVEN).take(1).subscribe(w);
verify(w, times(1)).onNext(anyInt());
verify(w).onNext(4);
verify(w, times(1)).onComplete();
Expand All @@ -196,7 +196,7 @@ public void testTakeFirstWithPredicateOfSome() {
@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
Observable<Integer> o = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
o.takeFirst(IS_EVEN).subscribe(w);
o.filter(IS_EVEN).take(1).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onComplete();
verify(w, never()).onError(any(Throwable.class));
Expand Down