@@ -13,45 +13,46 @@ public class PubSub {
1313 pub -> [Data1] -> mapPub (operator) -> [Data2] -> sub
1414 */
1515 public static void main (String [] args ) {
16- Publisher <Integer > pub = getIntegerPublisherOneTo ( );
16+ Publisher <Integer > pub = streamPublisher ( Stream . iterate ( 1 , i -> i + 1 ) );
1717 Publisher <Integer > mapPub = mapPub (pub , i -> i * 10 );
18- Publisher <Integer > sumPub = reducePub (mapPub , Integer ::sum );
18+ Publisher <Integer > sumPub = reducePub (mapPub , 0 , Integer ::sum );
1919
20- Subscriber <Integer > sub = getIntegerPrintSubscriber (3 );
20+ Subscriber <Integer > sub = getIntegerPrintSubscriber (10 );
2121 sumPub .subscribe (sub );
2222 }
2323
24- private static Publisher <Integer > reducePub (Publisher <Integer > mapPub , BiFunction <Integer , Integer , Integer > reduceFunction ) {
25- return subscriber -> mapPub .subscribe (new DelegateSubscriber <Integer >((Subscriber <Integer >) subscriber ) {
26- int sum = 0 ;
24+ private static <T > Publisher <T > reducePub (Publisher <T > mapPub , T init , BiFunction <T , T , T > reduceFunction ) {
25+ return subscriber -> mapPub .subscribe (new DelegateSubscriber <>((Subscriber <T >) subscriber ) {
26+ T result = init ;
27+
2728 @ Override
28- public void onNext (Integer integer ) {
29- sum += integer ;
29+ public void onNext (T integer ) {
30+ result = reduceFunction . apply ( result , integer ) ;
3031 }
3132
3233 @ Override
3334 public void onComplete () {
34- subscriber .onNext (sum );
35+ subscriber .onNext (result );
3536 subscriber .onComplete ();
3637 }
3738 });
3839 }
3940
40- private static Publisher <Integer > mapPub (Publisher <Integer > publisher , Function <Integer , Integer > mapFunction ) {
41+ private static < T > Publisher <T > mapPub (Publisher <T > publisher , Function <T , T > mapFunction ) {
4142 return new Publisher <>() {
4243 @ Override
43- public void subscribe (Subscriber <? super Integer > subscriber ) {
44- publisher .subscribe (new DelegateSubscriber <Integer >((Subscriber <Integer >) subscriber ) {
44+ public void subscribe (Subscriber <? super T > subscriber ) {
45+ publisher .subscribe (new DelegateSubscriber <>((Subscriber <T >) subscriber ) {
4546 @ Override
46- public void onNext (Integer integer ) {
47- subscriber .onNext (mapFunction .apply (integer ));
47+ public void onNext (T t ) {
48+ subscriber .onNext (mapFunction .apply (t ));
4849 }
4950 });
5051 }
5152 };
5253 }
5354
54- private static Subscriber <Integer > getIntegerPrintSubscriber (int n ) {
55+ private static < T > Subscriber <T > getIntegerPrintSubscriber (int n ) {
5556 return new Subscriber <>() {
5657 @ Override
5758 public void onSubscribe (Subscription subscription ) {
@@ -60,7 +61,7 @@ public void onSubscribe(Subscription subscription) {
6061 }
6162
6263 @ Override
63- public void onNext (Integer integer ) {
64+ public void onNext (T integer ) {
6465 System .out .println ("onNext: " + integer );
6566 }
6667
@@ -76,17 +77,15 @@ public void onComplete() {
7677 };
7778 }
7879
79- private static Publisher <Integer > getIntegerPublisherOneTo ( ) {
80+ private static < T > Publisher <T > streamPublisher ( Stream < T > stream ) {
8081 return new Publisher <>() {
81- final Stream <Integer > intStream = Stream .iterate (1 , i -> i + 1 );
82-
8382 @ Override
84- public void subscribe (Subscriber <? super Integer > subscriber ) {
83+ public void subscribe (Subscriber <? super T > subscriber ) {
8584 subscriber .onSubscribe (new Subscription () {
8685 @ Override
8786 public void request (long n ) {
8887 try {
89- intStream .limit (n )
88+ stream .limit (n )
9089 .forEach (subscriber ::onNext );
9190 } catch (Exception e ) {
9291 subscriber .onError (e );
0 commit comments