+++
+++
- Einführung
- Was ist Reaktive Programmierung?
- Reactive Streams Spezifikation
- Implementierungen
- Code-Beispiele - Reactor
- Reaktive Programmierung mit Spring
- Demo
- Voraussetzungen
- Fazit
+++
Reactive programming is oriented around data flows and the propagation of change. This means that the underlying execution model will automatically propagate changes through the data flow.
- Programmierparadigma
- Pionierarbeit: Reactive Extensions (Rx) für .NET
- Standardisierung für JVM durch Reactive Streams
Note: Programmierparadigma, welches sich an Datenflüsse orientiert. Änderungen in den Datenflüssen automatisch propagiert.
Beispiel: Excel - Wert in Zelle ändert sich und Summenzelle wird neu berechnet
Observer pattern + Iterator pattern + functional programming = reactive programming
Source: https://projectreactor.io/docs/core/release/reference/#intro-reactive
+++
Reactive Streams Spezifikation
+++
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Note:
- stellt eine Anzahl von (unbegrenzten) Elementen bereit |
- Subscriber können diese Elemente konsumieren |
- Subscriber können dynamisch und zu unterschiedlichen Zeiten Elemente erhalten |
Zusätzlich gibt es Implementierungsrichtlinien. RxJava, Reactor und Java 9 halten sich an der Spezifikation Beispiel Reactor: Flux, Mono
- subscribe() - Request to start streaming data +++
public interface Subscription {
public void request(long n);
public void cancel();
}
Note:
-
stellt 1:1 Lebenszyklus eines Subscriber dar
-
kann nur einmal von einem Subscriber verwendet werden
-
request(n) aktiviert den Publisher zum senden von Elementen an den Subscriber, Ende muss mit Aufruf von onComplete/onError signalisiert werden
-
cancel() Publisher wird benachrichtigt keine weiteren Elemente zu senden und die Resourcen aufzuräumen
+++
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Note:
- onSubscribe() wird ausgelöst nachdem Publisher#subscribe mit dem Subscriber aufgerufen wurden, es werden keine Elemente übertragen
- onNext() wird durch den Publisher ausgelöst (Notification), damit der Subscriber request() nutzt
- onError() Fehlerfall - keine Events folgen, Aufruf von request() ändert nichts daran
- onComplete() Erfolgreich - keine Events folgen, Aufruf von request() ändert nichts daran
+++
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {
}
Note: Verhält sich gleizeitig als Publisher und Subscriber und hat jeweils die Regeln einzuhalten. Beispiel Reactor:
+++
- Backpressure |
- hot & cold |
Note: Backpressure
- PUSH: Publisher schickt schneller Daten als der Subscriber Verarbeiten kann -> Subscriber informiert Publisher darüber hot vs cold
- cold: Eine 'cold'-Sequenz startet immer einen neuen Subscriber mit den Daten
- hot: Bei 'hot' erhölt man die Daten ab dem Punkt an dem man subscribed, man bekommt daher nicht alle Daten mit
+++
RxJava 2 | Java 9 Flow | Reactor 3 | |
---|---|---|---|
Java | 6+ | 9+ | 8+ |
Publisher | Flowable, Observable, Single, Maybe, Completable | selbst implementieren | Mono, Flux |
Subscriber | ... | selbst implementieren | viele vorhanden |
Note:
+++
Flux<String> flux = Flux.just("Java", "Go", "Assembler",
"php", "Ada", "Kotlin", "Clojure")
.doOnNext(System.out::println)
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase);
- keine Ausgabe vorhanden |
+++
Flux<String> flux = Flux.just("Java", "Go", "Assembler",
"php", "Ada", "Kotlin", "Clojure")
.doOnNext(System.out::println)
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase);
flux.subscribe(item -> System.out.println("Subscriber: " + item));
Java
Go
Assembler
Subscriber: ASSEMBLER
php
Ada
Subscriber: ADA
Kotlin
Clojure
+++
Flux<String> flux = Flux.just("Go", "ColdFusion",
"Java", "C", "JavaScript", "Clojure")
.doOnNext(System.out::println)
.filter(name -> name.startsWith("C"))
.map(String::toUpperCase);
flux.subscribe(name ->
System.out.println("subscribe 1: " + name));
flux.subscribe(name ->
System.out.println("subscribe 2: " + name));
Go
Java
Clojure
subscribe 1: CLOJURE
Go
Java
Clojure
subscribe 2: CLOJURE
+++
UnicastProcessor<String> hot = UnicastProcessor.create();
Flux<String> flux = hot.publish().autoConnect()
.map(String::toUpperCase);
flux.subscribe(name ->
System.out.println("subscribe 1: " + name));
hot.onNext("Go");
hot.onNext("Scala");
flux.subscribe(name ->
System.out.println("subscribe 2: " + name));
hot.onNext("Clojure");
hot.onNext("Java");
subscribe 1: GO
subscribe 1: SCALA
subscribe 1: CLOJURE
subscribe 2: CLOJURE
subscribe 1: JAVA
subscribe 2: JAVA
- Spring Framework von Pivotal |
- nutzt Reactor |
- Spring 5.0 GA -> 21. September |
- Spring Boot 2.0 -> 20. November |
+++
+++
ab Servlet Version 3.1
- Tomcat
- Jetty
- Netty
- Undertow
+++
- MongoDB
- Apache Cassandra
- Redis
Note: JDBC soll evtl. in einer reaktiven Version kommen
Note:
- Reaktive Programmierung ist nicht die Lösung für alles
- Kann die Programmierung angenehmer machen und dient als Alternative zu Parallelität (mehr Threads, mehr Hardware) und Asynchrone Programmierung -> Callbacks, Future
- Publisher (Flux, Mono)
- ohne subscribe() Aufruf passiert nichts
- Blockierender Code kann Gesamte Performance beeinträchtigen
+++
- eingesetzte Technologien
- Container |
- Datenbank |
- Skalierung wird benötigt durch blockierende Resourcen |
- Wissen der Mitarbeiter |
Note: Wenn an unterester Stelle eine reaktive Komponente (z. B. MongoDB) verwendet wird, dann sollte man darauf aufbauen und reaktiv Entwickeln
- External Service Calls -> REST
- Highly Concurrent Message Consumers
- Spreadsheets
+++