Skip to content

Commit 966013f

Browse files
committed
generalize
1 parent 1d6d077 commit 966013f

File tree

2 files changed

+15
-14
lines changed

2 files changed

+15
-14
lines changed

src/main/java/concurrency/reactive/DelegateSubscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66

7-
abstract class DelegateSubscriber<T> implements Subscriber<T> {
7+
abstract class DelegateSubscriber<T, R> implements Subscriber<T> {
88

9-
private final Subscriber<T> subscriber;
9+
private final Subscriber<?> subscriber;
1010

11-
DelegateSubscriber(Subscriber<T> subscriber) {
11+
DelegateSubscriber(Subscriber<? super R> subscriber) {
1212
this.subscriber = subscriber;
1313
}
1414

src/main/java/concurrency/reactive/PubSub.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@ public class PubSub {
1313
pub -> [Data1] -> mapPub (operator) -> [Data2] -> sub
1414
*/
1515
public static void main(String[] args) {
16-
Publisher<Integer> pub = streamPublisher(Stream.iterate(1, i -> i + 1));
17-
Publisher<Integer> mapPub = mapPub(pub, i -> i * 10);
18-
Publisher<Integer> sumPub = reducePub(mapPub, 0, Integer::sum);
16+
Publisher<Integer> pub = streamPublisher(Stream.iterate(1, i -> i + 1) );
17+
Publisher<Integer> pub1= mapPub(pub, i -> i * 10);
18+
// Publisher<Integer> pub2 = reducePub(pub1, 0, Integer::sum);
19+
Publisher<String> pub2 = reducePub(pub1, "", (s, i) -> s + i + ". ");
1920

20-
Subscriber<Integer> sub = getIntegerPrintSubscriber(10);
21-
sumPub.subscribe(sub);
21+
var sub = getIntegerPrintSubscriber(10);
22+
pub2.subscribe(sub);
2223
}
2324

24-
private static <T> Publisher<T> reducePub(Publisher<T> mapPub, T init , BiFunction<T, T, T> reduceFunction) {
25-
return subscriber -> mapPub.subscribe(new DelegateSubscriber<>((Subscriber<T>) subscriber) {
26-
T result = init;
25+
private static <T, R> Publisher<R> reducePub(Publisher<T> mapPub, R init , BiFunction<R, T, R> reduceFunction) {
26+
return subscriber -> mapPub.subscribe(new DelegateSubscriber<T, R>(subscriber) {
27+
R result = init;
2728

2829
@Override
2930
public void onNext(T integer) {
@@ -38,11 +39,11 @@ public void onComplete() {
3839
});
3940
}
4041

41-
private static <T> Publisher<T> mapPub(Publisher<T> publisher, Function<T, T> mapFunction) {
42+
private static <T, R> Publisher<R> mapPub(Publisher<T> publisher, Function<T, R> mapFunction) {
4243
return new Publisher<>() {
4344
@Override
44-
public void subscribe(Subscriber<? super T> subscriber) {
45-
publisher.subscribe(new DelegateSubscriber<>((Subscriber<T>) subscriber) {
45+
public void subscribe(Subscriber<? super R> subscriber) {
46+
publisher.subscribe(new DelegateSubscriber<T, R>(subscriber) {
4647
@Override
4748
public void onNext(T t) {
4849
subscriber.onNext(mapFunction.apply(t));

0 commit comments

Comments
 (0)