Skip to content

Adds Observable.sorted method #4264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11438,6 +11438,57 @@ public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Intege
return lift(new OperatorToObservableSortedList<T>(sortFunction, initialCapacity));
}

/**
* Returns an Observable that emits the events emitted by source Observable, in a
* sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all
* other items in the sequence.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a note about long or non-terminating or infinite sources as they may run out of memory or never finish collecting the elements to be sorted.

* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Observable} in an
* unbounded manner (i.e., without applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @throws ClassCastException
* if any item emitted by the Observable does not implement {@link Comparable} with respect to
* all other items emitted by the Observable
* @return an Observable that emits the items emitted by the source Observable in sorted order
*/
@Experimental
public final Observable<T> sorted(){
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't sure to name it sorted or sort, what do u think ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may as well call it sorted I think

return toSortedList().flatMapIterable(UtilityFunctions.<List<T>>identity());
}

/**
* Returns an Observable that emits the events emitted by source Observable, in a
* sorted order based on a specified comparison function.
*
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Observable} in an
* unbounded manner (i.e., without applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sortFunction
* a function that compares two items emitted by the source Observable and returns an Integer
* that indicates their sort order
* @return an Observable that emits the items emitted by the source Observable in sorted order
*/
@Experimental
public final Observable<T> sorted(Func2<? super T, ? super T, Integer> sortFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad we didn't use Comparator with toSortedList, the boxing/unboxing has some overhead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd would it make sense to add toSortedList that takes Comparator and corresponding sorted ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would cause lambda ambiguity because their similar pattern: (a, b) -> a.compareTo(b). This has been fixed in v2. However, if you'd roll a dedicated operator, that could take a Comparator although it would add some API inconsistency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see .. then I would prefer to not add one..

return toSortedList(sortFunction).flatMapIterable(UtilityFunctions.<List<T>>identity());
}

/**
* Modifies the source Observable so that subscribers will unsubscribe from it on a specified
* {@link Scheduler}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ public void testSortedList() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<List<Integer>> observable = w.toSortedList();

@SuppressWarnings("unchecked")
Observer<List<Integer>> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
observable.subscribe(testSubscriber);
testSubscriber.assertValue(Arrays.asList(1,2,3,4,5));
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
Expand Down Expand Up @@ -154,12 +153,11 @@ public void testSortedListCapacity() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<List<Integer>> observable = w.toSortedList(4);

@SuppressWarnings("unchecked")
Observer<List<Integer>> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
observable.subscribe(testSubscriber);
testSubscriber.assertValue(Arrays.asList(1,2,3,4,5));
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
Expand All @@ -172,12 +170,11 @@ public Integer call(Integer t1, Integer t2) {
}
});

@SuppressWarnings("unchecked")
Observer<List<Integer>> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
observable.subscribe(testSubscriber);
testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1));
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
Expand All @@ -190,11 +187,85 @@ public Integer call(Integer t1, Integer t2) {
}
}, 4);

@SuppressWarnings("unchecked")
Observer<List<Integer>> observer = mock(Observer.class);
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
observable.subscribe(testSubscriber);
testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1));
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
public void testSorted() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<Integer> observable = w.sorted();

TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
observable.subscribe(testSubscriber);
testSubscriber.assertValues(1,2,3,4,5);
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
public void testSortedWithCustomFunction() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<Integer> observable = w.sorted(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
return t2 - t1;
}

});

TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
observable.subscribe(testSubscriber);
testSubscriber.assertValues(5,4,3,2,1);
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
public void testSortedCustomComparator() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<Integer> observable = w.sorted(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer t1, Integer t2) {
return t1.compareTo(t2);
}

});

TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
observable.subscribe(testSubscriber);
testSubscriber.assertValues(1,2,3,4,5);
testSubscriber.assertNoErrors();
testSubscriber.assertCompleted();
}

@Test
public void testSortedWithNonComparable() {
NonComparable n1 = new NonComparable(1,"a");
NonComparable n2 = new NonComparable(2,"b");
NonComparable n3 = new NonComparable(3,"c");
Observable<NonComparable> w = Observable.just(n1,n2,n3);

Observable<NonComparable> observable = w.sorted();

TestSubscriber<NonComparable> testSubscriber = new TestSubscriber<NonComparable>();
observable.subscribe(testSubscriber);
testSubscriber.assertNoValues();
testSubscriber.assertError(ClassCastException.class);
testSubscriber.assertNotCompleted();
}

private final static class NonComparable{
public int i;
public String s;

NonComparable(int i, String s){
this.i = i;
this.s = s;
}
}
}