diff --git a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/WebSocketApi.java b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/WebSocketApi.java index d906a3d..03954ab 100644 --- a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/WebSocketApi.java +++ b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/WebSocketApi.java @@ -4,6 +4,7 @@ import de.dvdgeisler.iot.dirigera.client.api.model.events.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.scheduler.Schedulers; import java.util.ArrayList; import java.util.List; @@ -30,7 +31,6 @@ public void accept(final Event event) { } private final ClientApi api; - private final Thread thread; private final AtomicBoolean running; private final List> listeners; @@ -38,29 +38,23 @@ public WebSocketApi(final ClientApi api) { this.listeners = new ArrayList<>(); this.api = api; this.running = new AtomicBoolean(false); - this.thread = new Thread(this::run, "websocket"); - this.thread.start(); + + Schedulers.boundedElastic().schedule(this::run); } private void run() { - this.running.set(true); - do { - try { - this.loop(); - Thread.sleep(100); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } while (this.running.get()); - } + final Thread thread; - private void loop() { - if (!this.api.oauth.isPaired()) - return; - this.api.websocket(this::onEvent).block(); + thread = Thread.currentThread(); + log.info("Start event handler thread: id={}, name={}", thread.getId(), thread.getName()); + this.running.set(true); + this.api.websocket(this::onEvent, this::isRunning).block(); + this.running.set(false); + log.info("Finish event handler thread: id={}, name={}", thread.getId(), thread.getName()); } private synchronized void onEvent(final Event event) { + log.debug("Received Dirigera event: type={}, id={}, source={}, time={}", event.id, event.type, event.source, event.time); this.listeners.forEach(c -> c.accept(event)); } @@ -80,12 +74,11 @@ public synchronized void removeListener(final Consumer list this.listeners.removeIf(l -> l instanceof FilteredEventListener && ((FilteredEventListener) l).listener.equals(listener)); } - public void stop() throws InterruptedException { + public boolean isRunning() { + return this.running.get(); + } + + public void stop() { this.running.set(false); - this.thread.interrupt(); - while (this.thread.isAlive()) { - Thread.sleep(100); - this.thread.interrupt(); - } } } diff --git a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java index 10b37f5..d1ac05f 100644 --- a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java +++ b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java @@ -1,8 +1,10 @@ package de.dvdgeisler.iot.dirigera.client.api.http; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.dvdgeisler.iot.dirigera.client.api.model.Home; import de.dvdgeisler.iot.dirigera.client.api.model.events.Event; +import de.dvdgeisler.iot.dirigera.client.api.model.events.PingEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -11,22 +13,31 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; -import org.springframework.web.reactive.socket.client.WebSocketClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClient; +import reactor.core.scheduler.Schedulers; import javax.net.ssl.SSLException; import java.io.IOException; import java.net.URI; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; @Component public class ClientApi extends AbstractClientApi { private final static Logger log = LoggerFactory.getLogger(ClientApi.class); + private final static String WEBSOCKET_SPECVERSION = "1.0.0"; + private final static String WEBSOCKET_SOURCE_URN = String.format("urn:%s:%s", ClientApi.class.getPackageName(), ClientApi.class.getClass().getSimpleName()); + private final static Duration WEBSOCKET_PING_DELAY = Duration.ofSeconds(10); private final String hostname; private final short port; private final ObjectMapper objectMapper; @@ -96,22 +107,26 @@ public Mono dump() { .bodyToMono(Map.class); } - public Mono websocket(final Consumer consumer) { - final String token; - final String authorizationHeader; - final HttpClient httpClient; - final WebSocketClient client; + public Mono websocket(final WebSocketHandler consumer) { + final URI uri; + uri = URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port)); + return this.oauth.pairIfRequired() + .map(token -> String.format("Bearer %s", token.access_token)) + .map(bearer -> this.httpClient + .headers(headers -> headers.add(HttpHeaders.AUTHORIZATION, bearer)) + .keepAlive(true)) + .map(ReactorNettyWebSocketClient::new) + .flatMap(client -> client.execute(uri, consumer)); + } - try { - token = this.tokenStore.getAccessToken(); - authorizationHeader = String.format("Bearer %s", token); - httpClient = this.httpClient - .headers(headers -> headers.add(HttpHeaders.AUTHORIZATION, authorizationHeader)) - .keepAlive(true); - client = new ReactorNettyWebSocketClient(httpClient); - return client.execute(URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port)), session -> - session.receive() + public Mono websocket(final Consumer consumer, final BooleanSupplier run) { + return this.websocket(session -> { + Schedulers.boundedElastic().schedulePeriodically( + () -> session.send(this.buildPingMessage(session)).block(), + 0, WEBSOCKET_PING_DELAY.getSeconds(), TimeUnit.SECONDS); + return Mono.just(session) + .flatMapMany(WebSocketSession::receive) .map(WebSocketMessage::getPayload) .map(DataBuffer::asInputStream) .flatMap(i -> { @@ -122,12 +137,25 @@ public Mono websocket(final Consumer consumer) { } }) .doOnNext(consumer) - .repeat() - .then() - ); - } catch (IOException e) { - return Mono.error(e); - } + .repeat(run) + .then(); + }); + } + + private Mono buildPingMessage(final WebSocketSession session) { + return Mono.just(new PingEvent( + UUID.randomUUID().toString(), + LocalDateTime.now(), + WEBSOCKET_SPECVERSION, + WEBSOCKET_SOURCE_URN)) + .flatMap(pingEvent -> { + try { + return Mono.just(this.objectMapper.writeValueAsString(pingEvent)); + } catch (JsonProcessingException e) { + return Mono.error(e); + } + }) + .map(session::textMessage); } } diff --git a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/model/events/PingEvent.java b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/model/events/PingEvent.java new file mode 100644 index 0000000..b9178f0 --- /dev/null +++ b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/model/events/PingEvent.java @@ -0,0 +1,14 @@ +package de.dvdgeisler.iot.dirigera.client.api.model.events; + +import java.time.LocalDateTime; + +import static de.dvdgeisler.iot.dirigera.client.api.model.events.EventType.PING; + +public class PingEvent extends Event { + public PingEvent(final String id, final LocalDateTime time, final String specversion, final String source) { + super(id, time, specversion, source, PING, null); + } + + public PingEvent() { + } +} diff --git a/dirigera-client-mqtt/src/main/java/de/dvdgeisler/iot/dirigera/client/mqtt/DirigeraClientMqttApplication.java b/dirigera-client-mqtt/src/main/java/de/dvdgeisler/iot/dirigera/client/mqtt/DirigeraClientMqttApplication.java index 157d8b9..c5622a8 100644 --- a/dirigera-client-mqtt/src/main/java/de/dvdgeisler/iot/dirigera/client/mqtt/DirigeraClientMqttApplication.java +++ b/dirigera-client-mqtt/src/main/java/de/dvdgeisler/iot/dirigera/client/mqtt/DirigeraClientMqttApplication.java @@ -103,12 +103,8 @@ public static void main(String[] args) { public void exit(int status) { - try { - log.info("Close WebSocket"); - this.api.websocket.stop(); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } + log.info("Close WebSocket"); + this.api.websocket.stop(); if (context != null) { log.info("Close Spring Boot context");