Skip to content

Commit 8180780

Browse files
committed
Add Observable filter operations
1 parent 99fdd54 commit 8180780

File tree

5 files changed

+155
-0
lines changed

5 files changed

+155
-0
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*emit only the first item (or the first item that meets some condition) emitted by an Observable*/
2+
/*If you are only interested in the first item emitted by an Observable, or the first item that meets
3+
some criteria, you can filter the Observable with the First operator.*/
4+
/*n some implementations there is also a Single operator. It behaves similarly to First except that it
5+
waits until the source Observable terminates in order to guarantee that it only emits a single item
6+
(otherwise, rather than emitting that item, it terminates with an error). You can use this to not only
7+
take the first item from the source Observable but to also guarantee that there was only one item.*/
8+
9+
package FilteringObservables;
10+
11+
import rx.Observable;
12+
13+
public class First {
14+
public static void main(String[] args) {
15+
Observable.just(8,4,5,6).first().subscribe(
16+
integer -> System.out.println(integer),
17+
throwable -> System.out.println("Error:"+throwable),
18+
() -> System.out.println("Completed First()")
19+
);
20+
21+
Observable.just(8, 2, 3, 4, 1, 15).first(integer -> integer>10).subscribe(
22+
integer -> System.out.println(integer),
23+
throwable -> System.out.println("Error:"+throwable),
24+
() -> System.out.println("Completed First(func)")
25+
);
26+
27+
Observable.just(8).single().subscribe(
28+
integer -> System.out.println(integer),
29+
throwable -> System.out.println("Error:"+throwable),
30+
() -> System.out.println("Completed Single")
31+
);
32+
33+
}
34+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*do not emit any items from an Observable but mirror its termination notification*/
2+
/*The IgnoreElements operator suppresses all of the items emitted by the source Observable,
3+
but allows its termination notification (either onError or onCompleted) to pass through unchanged.
4+
5+
If you do not care about the items being emitted by an Observable, but you do want to be notified
6+
when it completes or when it terminates with an error, you can apply the ignoreElements operator to
7+
the Observable, which will ensure that it will never call its observers’ onNext handlers*/
8+
package FilteringObservables;
9+
10+
import rx.Observable;
11+
12+
public class IgnoreElements {
13+
public static void main(String[] args) {
14+
Observable.just(8,4,5,6).ignoreElements().subscribe(
15+
integer -> System.out.println(integer),
16+
throwable -> System.out.println("It had an Error"),
17+
() -> System.out.println("It's Completed")
18+
);
19+
}
20+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*emit only the last item (or the last item that meets some condition) emitted by an Observable*/
2+
/*If you are only interested in the last item emitted by an Observable, or the last item that meets some criteria,
3+
you can filter the Observable with the Last operator.
4+
5+
In some implementations, Last is not implemented as a filtering operator that returns an Observable,
6+
but as a blocking function that returns a particular item when the source Observable terminates.
7+
In those implementations, if you instead want a filtering operator, you may have better luck with TakeLast(1).*/
8+
9+
/*The BlockingObservable methods do not transform an Observable into another, filtered Observable,
10+
but rather they break out of the Observable cascade, blocking until the Observable emits the desired
11+
item, and then return that item itself.*/
12+
package FilteringObservables;
13+
14+
import rx.Observable;
15+
16+
public class Last {
17+
public static void main(String[] args) {
18+
Observable.just(8,4,5,6,3).last(integer -> integer%2==0).subscribe(
19+
integer -> System.out.println(integer),
20+
throwable -> System.out.println("Error"),
21+
() -> System.out.println("Completed divisible by 2")
22+
);
23+
24+
Observable.just(8,4,5,6,3).last().subscribe(
25+
integer -> System.out.println(integer),
26+
throwable -> System.out.println("Error"),
27+
() -> System.out.println("Completed last")
28+
);
29+
}
30+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*emit the most recent items emitted by an Observable within periodic time intervals*/
2+
/*The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling.
3+
4+
In some implementations, there is also a ThrottleFirst operator that is similar,
5+
but emits not the most-recently emitted item in the sample period, but the first item that was emitted during that period.*/
6+
package FilteringObservables;
7+
8+
import rx.Observable;
9+
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class Sample {
14+
public static void main(String[] args) {
15+
CountDownLatch latch = new CountDownLatch(1);
16+
Observable.interval(1, TimeUnit.MILLISECONDS).sample(2,TimeUnit.SECONDS).subscribe(
17+
integer -> System.out.println(integer),
18+
throwable -> System.out.println("Error!"),
19+
() -> {
20+
latch.countDown();
21+
System.out.println("Completed");
22+
}
23+
);
24+
25+
try {
26+
latch.await();
27+
} catch (InterruptedException e) {
28+
e.printStackTrace();
29+
}
30+
}
31+
32+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*You can ignore the first n items emitted by an Observable and attend only to those items that come after,
2+
by modifying the Observable with the Skip operator.*/
3+
package FilteringObservables;
4+
5+
import rx.Observable;
6+
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class Skip {
11+
public static void main(String[] args) throws InterruptedException {
12+
CountDownLatch latch = new CountDownLatch(1);
13+
Observable.interval(1, TimeUnit.SECONDS).take(5).skip(2).subscribe(
14+
integer -> System.out.println(integer),
15+
throwable -> System.out.println("Error!"),
16+
() -> {
17+
latch.countDown();
18+
System.out.println("Completed");
19+
}
20+
);
21+
22+
latch.await();
23+
24+
//todo check
25+
CountDownLatch latch2 = new CountDownLatch(1);
26+
Observable.interval(1, TimeUnit.SECONDS).take(2).skip(2,TimeUnit.SECONDS).subscribe(
27+
integer -> System.out.println(integer),
28+
throwable -> System.out.println("Error!"),
29+
() -> {
30+
latch2.countDown();
31+
System.out.println("Completed skip seconds");
32+
}
33+
);
34+
35+
latch2.await();
36+
37+
}
38+
39+
}

0 commit comments

Comments
 (0)