Skip to content

Commit ff72c07

Browse files
committed
interval and take
1 parent a602f3c commit ff72c07

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package concurrency.reactive;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.TimeUnit;
7+
import org.reactivestreams.Publisher;
8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class IntervalEx {
14+
15+
private static final Logger LOG = LoggerFactory.getLogger(IntervalEx.class.getName());
16+
17+
public static void main(String[] args) {
18+
LOG.debug("enter");
19+
20+
Publisher<Integer> pub = intervalPublisher();
21+
Publisher<Integer> takePub = takePub(pub, 5);
22+
takePub.subscribe(getSubscriber());
23+
24+
LOG.debug("exit");
25+
}
26+
27+
private static Publisher<Integer> intervalPublisher() {
28+
return subscriber ->
29+
subscriber.onSubscribe(new Subscription() {
30+
final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
31+
int i = 0;
32+
33+
@Override
34+
public void request(long n) {
35+
LOG.debug("request");
36+
exec.scheduleAtFixedRate(
37+
() -> {
38+
subscriber.onNext(i++);
39+
}, 0, 300, TimeUnit.MILLISECONDS
40+
);
41+
}
42+
43+
@Override
44+
public void cancel() {
45+
LOG.debug("cancel");
46+
exec.shutdown();
47+
}
48+
});
49+
}
50+
51+
private static Publisher<Integer> takePub(Publisher<Integer> pub, int n) {
52+
return subscriber ->
53+
pub.subscribe(new Subscriber<Integer>() {
54+
int count = 0;
55+
private Subscription subscription;
56+
57+
@Override
58+
public void onSubscribe(Subscription s) {
59+
subscription = s;
60+
subscriber.onSubscribe(s);
61+
}
62+
63+
@Override
64+
public void onNext(Integer integer) {
65+
if (count++ >= n) subscription.cancel();
66+
subscriber.onNext(integer);
67+
}
68+
69+
@Override
70+
public void onError(Throwable t) {
71+
subscriber.onError(t);
72+
}
73+
74+
@Override
75+
public void onComplete() {
76+
subscriber.onComplete();
77+
}
78+
});
79+
}
80+
81+
private static Subscriber<Integer> getSubscriber() {
82+
return new Subscriber<>() {
83+
@Override
84+
public void onSubscribe(Subscription s) {
85+
LOG.debug("onSubscribe");
86+
s.request(Long.MAX_VALUE);
87+
}
88+
89+
@Override
90+
public void onNext(Integer integer) {
91+
LOG.debug("onNext: {}", integer);
92+
}
93+
94+
@Override
95+
public void onError(Throwable t) {
96+
LOG.debug("onError: {}", t.getMessage());
97+
}
98+
99+
@Override
100+
public void onComplete() {
101+
LOG.debug("onComplete");
102+
}
103+
};
104+
}
105+
}

0 commit comments

Comments
 (0)