Skip to content

Commit 9489b1f

Browse files
committed
sec 08 combining publishers
1 parent aac9cf4 commit 9489b1f

20 files changed

+333
-3
lines changed

src/main/java/chap6/Lec04PublishOn.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public static void main(String[] args) {
1919

2020
flux.publishOn(Schedulers.boundedElastic())
2121
.doOnNext(i -> printThreadName("next " + i))
22+
.publishOn(Schedulers.parallel())
2223
.subscribe(v -> printThreadName("sub " + v));
2324

2425
Util.sleepSeconds(5);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package chap6;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.core.scheduler.Schedulers;
5+
import utility.Util;
6+
7+
public class Lec05PubSubOn {
8+
public static void main(String[] args) {
9+
10+
Flux<Object> flux = Flux.create(fluxSink -> {
11+
printThreadName("create");
12+
for (int i = 0; i < 4; i++) {
13+
fluxSink.next(i);
14+
}
15+
fluxSink.complete();
16+
})
17+
.doOnNext(i -> printThreadName("next " + i));
18+
19+
20+
flux
21+
.publishOn(Schedulers.parallel())
22+
.doOnNext(i -> printThreadName("next " + i))
23+
.subscribeOn(Schedulers.boundedElastic())
24+
.subscribe(v -> printThreadName("sub " + v));
25+
26+
27+
Util.sleepSeconds(5);
28+
29+
}
30+
31+
private static void printThreadName(String msg){
32+
System.out.println(msg + "\t\t: Thread : " + Thread.currentThread().getName());
33+
}
34+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package chap6;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.core.scheduler.Schedulers;
5+
6+
public class Lec06Parallel {
7+
8+
public static void main(String[] args) {
9+
Flux.range(1, 10)
10+
.parallel()
11+
.runOn(Schedulers.parallel())
12+
.doOnNext(i -> printThreadName("next " + i))
13+
.subscribe(v -> printThreadName("next " + v));
14+
}
15+
16+
private static void printThreadName(String msg) {
17+
System.out.println(msg + "\t\t: Thread : " + Thread.currentThread().getName());
18+
}
19+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package chap6;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.time.Duration;
7+
8+
public class Lec07FluxInterval {
9+
10+
public static void main(String[] args) {
11+
Flux.interval(Duration.ofSeconds(1)).subscribe(Util.subscriber());
12+
Util.sleepSeconds(1);
13+
}
14+
}

src/main/java/chap7/Lec01Demo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package chap7;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.core.scheduler.Schedulers;
5+
import utility.Util;
6+
7+
public class Lec01Demo {
8+
public static void main(String[] args) {
9+
10+
Flux.create(fluxSink -> {
11+
for (int i = 0; i < 500; i++) {
12+
fluxSink.next(i);
13+
System.out.println("Pushed : " + i);
14+
}
15+
}).publishOn(Schedulers.boundedElastic())
16+
.doOnNext(i -> {
17+
Util.sleepMillis(10);
18+
}).subscribe(Util.subscriber());
19+
Util.sleepSeconds(60);
20+
}
21+
}

src/main/java/chap7/Lec02Drop.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package chap7;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.core.scheduler.Schedulers;
5+
import utility.Util;
6+
7+
public class Lec02Drop {
8+
public static void main(String[] args) {
9+
10+
Flux.create(fluxSink -> {
11+
for (int i = 0; i < 500; i++) {
12+
fluxSink.next(i);
13+
System.out.println("Pushed : " + i);
14+
}
15+
}).onBackpressureBuffer()
16+
.publishOn(Schedulers.boundedElastic())
17+
.doOnNext(i -> {
18+
Util.sleepMillis(10);
19+
}).subscribe(Util.subscriber());
20+
Util.sleepSeconds(60);
21+
}
22+
}

src/main/java/chap7/Lec03latest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package chap7;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.core.scheduler.Schedulers;
5+
import utility.Util;
6+
7+
public class Lec03latest {
8+
public static void main(String[] args) {
9+
10+
Flux.create(fluxSink -> {
11+
for (int i = 0; i < 500; i++) {
12+
fluxSink.next(i);
13+
System.out.println("Pushed : " + i);
14+
}
15+
}).onBackpressureLatest()
16+
.publishOn(Schedulers.boundedElastic())
17+
.doOnNext(i -> {
18+
Util.sleepMillis(10);
19+
}).subscribe(Util.subscriber());
20+
Util.sleepSeconds(60);
21+
}
22+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package chap8;
2+
3+
import chap8.helper.NameGenerator;
4+
import utility.Util;
5+
6+
public class Lec01StartWith {
7+
8+
public static void main(String[] args) {
9+
NameGenerator generator = new NameGenerator();
10+
generator.generateNames().take(2).subscribe(Util.subscriber("sam"));
11+
12+
13+
generator.generateNames().take(2).subscribe(Util.subscriber("mike"));
14+
15+
16+
generator.generateNames().filter(n -> n.startsWith("A")).take(1).subscribe(Util.subscriber("marshal"));
17+
18+
}
19+
}

src/main/java/chap8/Lec02Concat.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package chap8;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
public class Lec02Concat {
7+
8+
public static void main(String[] args) {
9+
Flux<String> flux1 = Flux.just("a","b","c");
10+
Flux<String> flux2 = Flux.just("d","e","f");
11+
12+
Flux<String> concatFlux = flux1.concatWith(flux2);
13+
Flux<String> concatFlux2 = Flux.concat(flux1,flux2);
14+
concatFlux.subscribe(System.out::println);
15+
16+
System.out.println("FLUXXXXXXX2");
17+
concatFlux2.subscribe(System.out::println);
18+
Flux<String> fluxa = Flux.just("1","2","3");
19+
Flux<String> fluxc = Flux.error(new RuntimeException("error"));
20+
Flux<String> fluxb = Flux.just("4","5","6");
21+
22+
Flux<String> fluxalphabet = Flux.concatDelayError(fluxa,fluxb,fluxc);
23+
fluxalphabet.subscribe(Util.subscriber());
24+
}
25+
}

src/main/java/chap8/Lec03Merge.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package chap8;
2+
3+
import chap8.helper.American;
4+
import chap8.helper.Emirate;
5+
import chap8.helper.Qatar;
6+
import reactor.core.publisher.Flux;
7+
import utility.Util;
8+
9+
public class Lec03Merge {
10+
11+
public static void main(String[] args) {
12+
13+
Flux<String> merge = Flux.merge(Qatar.getFlights(), Emirate.getFlights(), American.getFlights());
14+
merge.subscribe(Util.subscriber());
15+
Util.sleepSeconds(10);
16+
17+
}
18+
}

src/main/java/chap8/Lec04Zip.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package chap8;
2+
3+
import reactor.core.publisher.Flux;
4+
import reactor.util.function.Tuple3;
5+
import utility.Util;
6+
7+
public class Lec04Zip {
8+
9+
public static void main(String[] args) {
10+
Flux<Tuple3<String, String, String>> car = Flux.zip(getBody(),getEngine(),getTires());
11+
car.subscribe(Util.subscriber());
12+
}
13+
14+
public static Flux<String> getBody(){
15+
return Flux.range(1,2).map(integer -> "Body "+integer);
16+
}
17+
18+
public static Flux<String> getEngine(){
19+
return Flux.range(1,5).map(integer -> "Engine "+integer);
20+
21+
}
22+
23+
public static Flux<String> getTires(){
24+
return Flux.range(1,5).map(integer -> "Tire "+integer);
25+
26+
}
27+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package chap8;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.time.Duration;
7+
8+
public class Lec05CombineLatest {
9+
10+
public static void main(String[] args) {
11+
Flux.combineLatest(getNumber(),getString(),(i,s)-> s + i).subscribe(Util.subscriber());
12+
13+
Util.sleepSeconds(10);
14+
15+
}
16+
17+
private static Flux<String> getString(){
18+
return Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
19+
}
20+
21+
22+
private static Flux<Integer> getNumber(){
23+
return Flux.just(1,2,3).delayElements(Duration.ofSeconds(3));
24+
}
25+
26+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package chap8.helper;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.time.Duration;
7+
8+
public class American {
9+
10+
public static Flux<String> getFlights() {
11+
return Flux.range(1, Util.faker().random().nextInt(10, 20))
12+
.delayElements(Duration.ofSeconds(1))
13+
.map(i -> "AA " + Util.faker().random().nextInt(100, 999))
14+
.filter(i -> Util.faker().random().nextBoolean());
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package chap8.helper;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.time.Duration;
7+
8+
public class Emirate {
9+
10+
public static Flux<String> getFlights() {
11+
return Flux.range(1, Util.faker().random().nextInt(1, 10))
12+
.delayElements(Duration.ofSeconds(1))
13+
.map(i -> "Emirate " + Util.faker().random().nextInt(100, 999))
14+
.filter(i -> Util.faker().random().nextBoolean());
15+
}
16+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package chap8.helper;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
public class NameGenerator {
10+
11+
12+
private List<String> list = new ArrayList<>();
13+
14+
public Flux<String> generateNames() {
15+
return Flux.generate(stringSynchronousSink -> {
16+
System.out.println("generated fresh");
17+
Util.sleepSeconds(1);
18+
String name = Util.faker().name().firstName();
19+
list.add(name);
20+
stringSynchronousSink.next(name);
21+
}).cast(String.class)
22+
.startWith(getFromCache());
23+
24+
}
25+
26+
private Flux<String> getFromCache() {
27+
System.out.println("from cache");
28+
return Flux.fromIterable(list);
29+
}
30+
}

src/main/java/chap8/helper/Qatar.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package chap8.helper;
2+
3+
import reactor.core.publisher.Flux;
4+
import utility.Util;
5+
6+
import java.time.Duration;
7+
8+
public class Qatar {
9+
10+
public static Flux<String> getFlights() {
11+
return Flux.range(1, Util.faker().random().nextInt(1, 5))
12+
.delayElements(Duration.ofSeconds(1))
13+
.map(i -> "Qatar " + Util.faker().random().nextInt(100, 999))
14+
.filter(i -> Util.faker().random().nextBoolean());
15+
}
16+
}

src/main/java/utility/DefaultSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void onSubscribe(Subscription subscription) {
2222

2323
@Override
2424
public void onNext(Object o) {
25-
System.out.println(name +" Received :"+ o);
25+
System.out.println(name +"Received :"+ o);
2626
}
2727

2828
@Override
@@ -32,6 +32,6 @@ public void onError(Throwable throwable) {
3232

3333
@Override
3434
public void onComplete() {
35-
System.out.println(name + " Completed: ");
35+
System.out.println(name + "Completed: ");
3636
}
3737
}

src/main/java/utility/Util.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ public static Faker faker(){
2727
}
2828

2929
public static void sleepSeconds(int seconds) {
30+
sleepMillis(seconds * 1000);
31+
}
32+
33+
public static void sleepMillis(int milli) {
3034
try {
31-
Thread.sleep(seconds * 1000);
35+
Thread.sleep(milli );
3236
} catch (InterruptedException e) {
3337
e.printStackTrace();
3438
}
-2 Bytes
Binary file not shown.

target/classes/utility/Util.class

97 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)