Skip to content

Commit

Permalink
Send periodic ping requests to dirigera to prevent connection timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdgeisler committed Dec 6, 2022
1 parent e16bead commit a1078b9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import de.dvdgeisler.iot.dirigera.client.api.model.events.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -30,37 +31,30 @@ public void accept(final Event event) {
}

private final ClientApi api;
private final Thread thread;
private final AtomicBoolean running;
private final List<Consumer<Event>> listeners;

public WebSocketApi(final ClientApi api) {
this.listeners = new ArrayList<>();
this.api = api;
this.running = new AtomicBoolean(false);
this.thread = new Thread(this::run, "websocket");
this.thread.start();

Schedulers.boundedElastic().schedule(this::run);
}

private void run() {
this.running.set(true);
do {
try {
this.loop();
Thread.sleep(100);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
} while (this.running.get());
}
final Thread thread;

private void loop() {
if (!this.api.oauth.isPaired())
return;
this.api.websocket(this::onEvent).block();
thread = Thread.currentThread();
log.info("Start event handler thread: id={}, name={}", thread.getId(), thread.getName());
this.running.set(true);
this.api.websocket(this::onEvent, this::isRunning).block();
this.running.set(false);
log.info("Finish event handler thread: id={}, name={}", thread.getId(), thread.getName());
}

private synchronized void onEvent(final Event event) {
log.debug("Received Dirigera event: type={}, id={}, source={}, time={}", event.id, event.type, event.source, event.time);
this.listeners.forEach(c -> c.accept(event));
}

Expand All @@ -80,12 +74,11 @@ public synchronized <E extends Event> void removeListener(final Consumer<E> list
this.listeners.removeIf(l -> l instanceof FilteredEventListener<?> && ((FilteredEventListener<?>) l).listener.equals(listener));
}

public void stop() throws InterruptedException {
public boolean isRunning() {
return this.running.get();
}

public void stop() {
this.running.set(false);
this.thread.interrupt();
while (this.thread.isAlive()) {
Thread.sleep(100);
this.thread.interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package de.dvdgeisler.iot.dirigera.client.api.http;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.dvdgeisler.iot.dirigera.client.api.model.Home;
import de.dvdgeisler.iot.dirigera.client.api.model.events.Event;
import de.dvdgeisler.iot.dirigera.client.api.model.events.PingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -11,22 +13,31 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.core.scheduler.Schedulers;

import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

@Component
public class ClientApi extends AbstractClientApi {
private final static Logger log = LoggerFactory.getLogger(ClientApi.class);
private final static String WEBSOCKET_SPECVERSION = "1.0.0";
private final static String WEBSOCKET_SOURCE_URN = String.format("urn:%s:%s", ClientApi.class.getPackageName(), ClientApi.class.getClass().getSimpleName());
private final static Duration WEBSOCKET_PING_DELAY = Duration.ofSeconds(10);
private final String hostname;
private final short port;
private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -96,22 +107,26 @@ public Mono<Map> dump() {
.bodyToMono(Map.class);
}

public Mono<Void> websocket(final Consumer<Event> consumer) {
final String token;
final String authorizationHeader;
final HttpClient httpClient;
final WebSocketClient client;
public Mono<Void> websocket(final WebSocketHandler consumer) {
final URI uri;

uri = URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port));
return this.oauth.pairIfRequired()
.map(token -> String.format("Bearer %s", token.access_token))
.map(bearer -> this.httpClient
.headers(headers -> headers.add(HttpHeaders.AUTHORIZATION, bearer))
.keepAlive(true))
.map(ReactorNettyWebSocketClient::new)
.flatMap(client -> client.execute(uri, consumer));
}

try {
token = this.tokenStore.getAccessToken();
authorizationHeader = String.format("Bearer %s", token);
httpClient = this.httpClient
.headers(headers -> headers.add(HttpHeaders.AUTHORIZATION, authorizationHeader))
.keepAlive(true);
client = new ReactorNettyWebSocketClient(httpClient);
return client.execute(URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port)), session ->
session.receive()
public Mono<Void> websocket(final Consumer<Event> consumer, final BooleanSupplier run) {
return this.websocket(session -> {
Schedulers.boundedElastic().schedulePeriodically(
() -> session.send(this.buildPingMessage(session)).block(),
0, WEBSOCKET_PING_DELAY.getSeconds(), TimeUnit.SECONDS);
return Mono.just(session)
.flatMapMany(WebSocketSession::receive)
.map(WebSocketMessage::getPayload)
.map(DataBuffer::asInputStream)
.flatMap(i -> {
Expand All @@ -122,12 +137,25 @@ public Mono<Void> websocket(final Consumer<Event> consumer) {
}
})
.doOnNext(consumer)
.repeat()
.then()
);
} catch (IOException e) {
return Mono.error(e);
}
.repeat(run)
.then();
});
}

private Mono<WebSocketMessage> buildPingMessage(final WebSocketSession session) {
return Mono.just(new PingEvent(
UUID.randomUUID().toString(),
LocalDateTime.now(),
WEBSOCKET_SPECVERSION,
WEBSOCKET_SOURCE_URN))
.flatMap(pingEvent -> {
try {
return Mono.just(this.objectMapper.writeValueAsString(pingEvent));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
})
.map(session::textMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package de.dvdgeisler.iot.dirigera.client.api.model.events;

import java.time.LocalDateTime;

import static de.dvdgeisler.iot.dirigera.client.api.model.events.EventType.PING;

public class PingEvent extends Event {
public PingEvent(final String id, final LocalDateTime time, final String specversion, final String source) {
super(id, time, specversion, source, PING, null);
}

public PingEvent() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,8 @@ public static void main(String[] args) {

public void exit(int status) {

try {
log.info("Close WebSocket");
this.api.websocket.stop();
} catch (InterruptedException e) {
log.error(e.getMessage());
}
log.info("Close WebSocket");
this.api.websocket.stop();

if (context != null) {
log.info("Close Spring Boot context");
Expand Down

0 comments on commit a1078b9

Please sign in to comment.