Skip to content

Commit b18417c

Browse files
committed
add Person Observable
1 parent 6a77278 commit b18417c

File tree

2 files changed

+55
-11
lines changed

2 files changed

+55
-11
lines changed

pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@
3232
<artifactId>javatuples</artifactId>
3333
<version>1.2</version>
3434
</dependency>
35+
36+
<dependency>
37+
<groupId>org.projectlombok</groupId>
38+
<artifactId>lombok</artifactId>
39+
<version>1.18.24</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>org.reactivestreams</groupId>
45+
<artifactId>reactive-streams</artifactId>
46+
<version>1.0.4</version>
47+
</dependency>
48+
49+
3550
</dependencies>
3651

3752
</project>

src/main/java/concurrency/reactive/Ob.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,33 @@
22

33
import java.util.Observable;
44
import java.util.Observer;
5-
import java.util.concurrent.Executor;
65
import java.util.concurrent.ExecutorService;
76
import java.util.concurrent.Executors;
8-
import java.util.stream.IntStream;
97
import java.util.stream.Stream;
8+
import lombok.AllArgsConstructor;
9+
import lombok.ToString;
1010

1111
@SuppressWarnings("deprecation")
1212
public class Ob {
1313

14-
1514
/*
16-
* Observable : source
17-
* Observer :
18-
*
19-
*
20-
*/
15+
* Observable : Publisher - data source
16+
* Observer : Subscriber - consume data
17+
*
18+
* Limitation
19+
* 1. how to know it's completed - onComplete
20+
* 2. how to handle error - onError
21+
*/
2122
public static void main(String[] args) {
2223

24+
testObserver(new IntObservable());
25+
testObserver(new Person("Jinhoon", 29));
26+
}
27+
28+
private static void testObserver(RunnableObservable runnableObservable) {
2329
Observer ob1 = (o, arg) -> System.out.println(Thread.currentThread().getName() + " ob1 consume " + arg); // override `void update` method
2430
Observer ob2 = (o, arg) -> System.out.println(Thread.currentThread().getName() + " ob2 consume " + arg);
2531

26-
IntObservable runnableObservable = new IntObservable();
2732
runnableObservable.addObserver(ob1);
2833
runnableObservable.addObserver(ob2);
2934

@@ -34,11 +39,15 @@ public static void main(String[] args) {
3439
executorService.shutdown();
3540
}
3641

37-
static class IntObservable extends Observable implements Runnable {
42+
abstract static class RunnableObservable extends Observable implements Runnable {
43+
44+
}
45+
46+
static class IntObservable extends RunnableObservable {
3847

3948
// override from Runnable
4049
public void run() {
41-
Stream.iterate(0, i -> i+1).limit(10)
50+
Stream.iterate(1, i -> i + 1).limit(10)
4251
.forEach(
4352
i -> {
4453
setChanged();
@@ -57,4 +66,24 @@ public void run() {
5766
}
5867
}
5968

69+
@ToString
70+
@AllArgsConstructor
71+
static class Person extends RunnableObservable {
72+
73+
String name;
74+
int age;
75+
76+
void setAge(int newAge) {
77+
if (this.age != newAge) {
78+
this.age = newAge;
79+
setChanged();
80+
notifyObservers(this);
81+
}
82+
}
83+
84+
public void run() {
85+
setAge(30);
86+
setAge(31);
87+
}
88+
}
6089
}

0 commit comments

Comments
 (0)