Skip to content

Commit 1565709

Browse files
committed
subscribeOn and publishOn
1 parent 07224b5 commit 1565709

File tree

1 file changed

+44
-2
lines changed

1 file changed

+44
-2
lines changed

src/main/java/concurrency/reactive/SchedulerEx.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
package concurrency.reactive;
22

3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
35
import org.reactivestreams.Publisher;
46
import org.reactivestreams.Subscriber;
57
import org.reactivestreams.Subscription;
68
import org.slf4j.Logger;
79
import org.slf4j.LoggerFactory;
810

911
public class SchedulerEx {
12+
1013
private static final Logger LOG = LoggerFactory.getLogger(SchedulerEx.class.getName());
1114

1215
public static void main(String[] args) {
16+
LOG.debug("enter");
17+
1318
Publisher<Integer> pub = subscriber ->
1419
subscriber.onSubscribe(new Subscription() {
1520
@Override
1621
public void request(long n) {
22+
LOG.debug("request");
1723
subscriber.onNext(1);
1824
subscriber.onNext(2);
1925
subscriber.onNext(3);
@@ -28,8 +34,44 @@ public void cancel() {
2834
}
2935
});
3036

37+
// subscribeOn
38+
Publisher<Integer> subOnPub = subscriber ->
39+
Executors.newSingleThreadExecutor().execute(() -> pub.subscribe(subscriber));
40+
41+
// publishOn
42+
Publisher<Integer> pubOnPub = sub ->
43+
pub.subscribe(new Subscriber<>() {
44+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
45+
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
sub.onSubscribe(s);
49+
}
50+
51+
@Override
52+
public void onNext(Integer integer) {
53+
executorService.execute(() -> sub.onNext(integer));
54+
}
55+
56+
@Override
57+
public void onError(Throwable t) {
58+
executorService.execute(() -> sub.onError(t));
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
executorService.execute(sub::onComplete);
64+
}
65+
});
66+
67+
subOnPub.subscribe(getSubscriber());
68+
pubOnPub.subscribe(getSubscriber());
69+
70+
LOG.debug("exit");
71+
}
3172

32-
pub.subscribe(new Subscriber<>() {
73+
private static Subscriber<Integer> getSubscriber() {
74+
return new Subscriber<>() {
3375
@Override
3476
public void onSubscribe(Subscription s) {
3577
LOG.debug("onSubscribe");
@@ -50,6 +92,6 @@ public void onError(Throwable t) {
5092
public void onComplete() {
5193
LOG.debug("onComplete");
5294
}
53-
});
95+
};
5496
}
5597
}

0 commit comments

Comments
 (0)