Skip to content

Commit 1ddce9d

Browse files
committed
add code samples for Conditional and Boolean Operators
1 parent 1074f6a commit 1ddce9d

21 files changed

+469
-234
lines changed

.idea/workspace.xml

Lines changed: 228 additions & 234 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*determine whether all items emitted by an Observable meet some criteria*/
2+
3+
/*Pass a predicate function to the All operator that accepts an item emitted
4+
by the source Observable and returns a boolean value based on an evaluation of
5+
that item. All returns an Observable that emits a single boolean value: true if
6+
and only if the source Observable terminates normally and every item emitted by
7+
the source Observable evaluated as true according to this predicate; false if any
8+
item emitted by the source Observable evaluates as false according to this predicate.*/
9+
package ConditionalAndBooleanOperators;
10+
11+
import rx.Observable;
12+
13+
public class All {
14+
15+
public static void main(String[] args) {
16+
Observable.just(1, 2, 3)
17+
.all(integer -> integer < 4)
18+
.subscribe(
19+
aBoolean -> System.out.println(aBoolean),
20+
throwable -> System.out.println("Error"),
21+
() -> System.out.println("Completed")
22+
);
23+
}
24+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification*/
2+
3+
/*When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these
4+
Observables: the first one that sends a notification to Amb, either by emitting an item or sending an onError or onCompleted notification.
5+
Amb will ignore and discard the emissions and notifications of all of the other source Observables.*/
6+
package ConditionalAndBooleanOperators;
7+
8+
import rx.Observable;
9+
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class Amb {
14+
public static void main(String[] args) throws InterruptedException {
15+
CountDownLatch latch = new CountDownLatch(1);
16+
Observable first = Observable.just(5, 5, 5).delay(5, TimeUnit.SECONDS);
17+
Observable second = Observable.just(2, 2, 2).delay(2, TimeUnit.SECONDS);
18+
19+
Observable.amb(first, second).subscribe(
20+
o -> System.out.println(o),
21+
throwable -> System.out.println("Error"),
22+
() -> {
23+
System.out.println("Completed");
24+
latch.countDown();
25+
}
26+
27+
);
28+
29+
latch.await();
30+
}
31+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*determine whether an Observable emits a particular item or not*/
2+
3+
/*Pass the Contains operator a particular item, and the Observable it returns will
4+
emit true if that item is emitted by the source Observable, or false if the source
5+
Observable terminates without emitting that item.
6+
7+
A related operator, IsEmpty returns an Observable that emits true if and only if the
8+
source Observable completes without emitting any items. It emits false if the source
9+
Observable emits an item.*/
10+
11+
package ConditionalAndBooleanOperators;
12+
13+
import rx.Observable;
14+
15+
public class Contains {
16+
public static void main(String[] args) {
17+
Observable.just(1, 2, 3).contains(3).subscribe(
18+
aBoolean -> System.out.println(aBoolean),
19+
throwable -> System.out.println("Error"),
20+
() -> System.out.println("Completed")
21+
);
22+
}
23+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*emit items from the source Observable, or a default item if the source Observable emits nothing*/
2+
3+
/*The DefaultIfEmpty operator simply mirrors the source Observable exactly if the source Observable
4+
emits any items. If the source Observable terminates normally (with an onComplete) without emitting any items,
5+
the Observable returned from DefaultIfEmpty will instead emit a default item of your choosing before it too completes.*/
6+
7+
package ConditionalAndBooleanOperators;
8+
9+
import rx.Observable;
10+
11+
public class DefaultIfEmpty {
12+
public static void main(String[] args) {
13+
Observable.empty().defaultIfEmpty(77).subscribe(
14+
o -> System.out.println(o),
15+
throwable -> System.out.println("Error"),
16+
() -> System.out.println("Completed")
17+
);
18+
}
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*determine whether two Observables emit the same sequence of items*/
2+
3+
/*Pass SequenceEqual two Observables, and it will compare the items
4+
emitted by each Observable, and the Observable it returns will emit
5+
true only if both sequences are the same (the same items, in the same
6+
order, with the same termination state).*/
7+
8+
package ConditionalAndBooleanOperators;
9+
10+
import rx.Observable;
11+
12+
public class SequenceEqual {
13+
public static void main(String[] args) {
14+
Observable first = Observable.just(1, 3, 2);
15+
Observable second = Observable.just(1, 2, 3);
16+
Observable third = Observable.just(1, 2, 3);
17+
18+
Observable.sequenceEqual(first, second).subscribe(
19+
o -> System.out.println(o),
20+
throwable -> System.out.println("Error"),
21+
() -> System.out.println("Completed not equal")
22+
);
23+
24+
Observable.sequenceEqual(second, third).subscribe(
25+
o -> System.out.println(o),
26+
throwable -> System.out.println("Error"),
27+
() -> System.out.println("Completed equal")
28+
);
29+
}
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*discard items emitted by an Observable until a second Observable emits an item*/
2+
3+
/*The SkipUntil subscribes to the source Observable, but ignores its emissions until
4+
such time as a second Observable emits an item, at which point SkipUntil begins to mirror the source Observable.*/
5+
package ConditionalAndBooleanOperators;
6+
7+
import rx.Observable;
8+
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class SkipUntil {
13+
public static void main(String[] args) throws InterruptedException {
14+
CountDownLatch latch = new CountDownLatch(1);
15+
16+
Observable first = Observable.interval(4, TimeUnit.SECONDS).take(1);
17+
18+
Observable second = Observable.interval(1, TimeUnit.SECONDS).take(7);
19+
20+
second.skipUntil(first).subscribe(o -> System.out.println(o),
21+
throwable -> System.out.println("Error"),
22+
() -> {
23+
System.out.println("Completed");
24+
latch.countDown();
25+
}
26+
);
27+
28+
latch.await();
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*discard items emitted by an Observable until a specified condition becomes false*/
2+
3+
/*The SkipWhile subscribes to the source Observable, but ignores its emissions until
4+
such time as some condition you specify becomes false, at which point SkipWhile begins
5+
to mirror the source Observable.*/
6+
7+
package ConditionalAndBooleanOperators;
8+
9+
import rx.Observable;
10+
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.TimeUnit;
13+
14+
public class SkipWhile {
15+
public static void main(String[] args) throws InterruptedException {
16+
CountDownLatch latch = new CountDownLatch(1);
17+
Observable.interval(1, TimeUnit.SECONDS).skipWhile(i -> i < 5).take(7).subscribe(
18+
aLong -> System.out.println(aLong),
19+
throwable -> System.out.println("Error"),
20+
() -> {
21+
System.out.println("Completed");
22+
latch.countDown();
23+
}
24+
);
25+
26+
latch.await();
27+
}
28+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*discard any items emitted by an Observable after a second Observable emits an item or terminates*/
2+
3+
/*The TakeUntil subscribes and begins mirroring the source Observable. It also monitors a second
4+
Observable that you provide. If this second Observable emits an item or sends a termination notification,
5+
the Observable returned by TakeUntil stops mirroring the source Observable and terminates.*/
6+
7+
8+
package ConditionalAndBooleanOperators;
9+
10+
import rx.Observable;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
15+
public class TakeUntil {
16+
public static void main(String[] args) throws InterruptedException {
17+
CountDownLatch latch = new CountDownLatch(1);
18+
19+
Observable first = Observable.interval(4, TimeUnit.SECONDS).take(1);
20+
21+
Observable second = Observable.interval(1, TimeUnit.SECONDS).take(7);
22+
23+
second.takeUntil(first).subscribe(o -> System.out.println(o),
24+
throwable -> System.out.println("Error"),
25+
() -> {
26+
System.out.println("Completed");
27+
latch.countDown();
28+
}
29+
);
30+
31+
latch.await();
32+
}
33+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package ConditionalAndBooleanOperators;
2+
3+
import rx.Observable;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
8+
public class TakeWhile {
9+
public static void main(String[] args) throws InterruptedException {
10+
CountDownLatch latch = new CountDownLatch(1);
11+
12+
Observable.interval(1, TimeUnit.SECONDS).takeWhile(i -> i < 5).take(7).subscribe(
13+
aLong -> System.out.println(aLong),
14+
throwable -> System.out.println("Error"),
15+
() -> {
16+
System.out.println("Completed");
17+
latch.countDown();
18+
}
19+
);
20+
21+
latch.await();
22+
}
23+
}

0 commit comments

Comments
 (0)