Skip to content

Commit 7e609de

Browse files
committed
DelegateSubscriber
1 parent c9fad93 commit 7e609de

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package concurrency.reactive;
2+
3+
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
abstract class DelegateSubscriber<T> implements Subscriber<T> {
8+
9+
private final Subscriber<T> subscriber;
10+
11+
DelegateSubscriber(Subscriber<T> subscriber) {
12+
this.subscriber = subscriber;
13+
}
14+
15+
@Override
16+
public void onSubscribe(Subscription s) {
17+
subscriber.onSubscribe(s);
18+
}
19+
20+
@Override
21+
public void onError(Throwable t) {
22+
subscriber.onError(t);
23+
}
24+
25+
@Override
26+
public void onComplete() {
27+
subscriber.onComplete();
28+
}
29+
}

src/main/java/concurrency/reactive/PubSub.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,11 @@ private static Publisher<Integer> mapPub(Publisher<Integer> publisher, Function<
2323
return new Publisher<>() {
2424
@Override
2525
public void subscribe(Subscriber<? super Integer> subscriber) {
26-
publisher.subscribe(new Subscriber<Integer>() {
27-
@Override
28-
public void onSubscribe(Subscription subscription) {
29-
subscriber.onSubscribe(subscription);
30-
}
31-
26+
publisher.subscribe(new DelegateSubscriber<Integer>((Subscriber<Integer>) subscriber) {
3227
@Override
3328
public void onNext(Integer integer) {
3429
subscriber.onNext(mapFunction.apply(integer));
3530
}
36-
37-
@Override
38-
public void onError(Throwable t) {
39-
subscriber.onError(t);
40-
}
41-
42-
@Override
43-
public void onComplete() {
44-
subscriber.onComplete();
45-
}
4631
});
4732
}
4833
};

0 commit comments

Comments
 (0)