From 8db9b4cf177781090baec976d924fc87533f82ec Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 13 Sep 2023 09:53:24 -0400 Subject: [PATCH] fix(registration): discovery plugin registration bugfixes and refactor (#193) * fix(registration): respond 401 on plugin pings if wrong credentials supplied * revamp registration flow logic * more error handling, attempt to query for previously-submitted matching credentials --- .../io/cryostat/agent/CryostatClient.java | 172 ++++++++++++++++-- .../java/io/cryostat/agent/HttpException.java | 4 + .../java/io/cryostat/agent/MainModule.java | 34 ++-- .../java/io/cryostat/agent/Registration.java | 100 +++++----- .../java/io/cryostat/agent/WebServer.java | 71 ++++---- 5 files changed, 270 insertions(+), 111 deletions(-) diff --git a/src/main/java/io/cryostat/agent/CryostatClient.java b/src/main/java/io/cryostat/agent/CryostatClient.java index ac0bc02f..8232f895 100644 --- a/src/main/java/io/cryostat/agent/CryostatClient.java +++ b/src/main/java/io/cryostat/agent/CryostatClient.java @@ -26,10 +26,13 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; @@ -39,8 +42,10 @@ import io.cryostat.agent.model.RegistrationInfo; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.io.FileUtils; import org.apache.commons.io.input.CountingInputStream; import org.apache.http.HttpHeaders; @@ -114,7 +119,8 @@ public CompletableFuture checkRegistration(PluginInfo pluginInfo) { return supply(req, (res) -> logResponse(req, res)).thenApply(this::isOkStatus); } - public CompletableFuture register(PluginInfo pluginInfo, URI callback) { + public CompletableFuture register( + int credentialId, PluginInfo pluginInfo, URI callback) { try { RegistrationInfo registrationInfo = new RegistrationInfo( @@ -126,7 +132,20 @@ public CompletableFuture register(PluginInfo pluginInfo, URI callbac mapper.writeValueAsString(registrationInfo), ContentType.APPLICATION_JSON)); return supply(req, (res) -> logResponse(req, res)) - .thenApply(res -> assertOkStatus(req, res)) + .handle( + (res, t) -> { + if (t != null) { + throw new CompletionException(t); + } + if (!isOkStatus(res)) { + try { + deleteCredentials(credentialId).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to delete previous credentials", e); + } + } + return assertOkStatus(req, res); + }) .thenApply( res -> { try (InputStream is = res.getEntity().getContent()) { @@ -155,22 +174,82 @@ public CompletableFuture register(PluginInfo pluginInfo, URI callbac public CompletableFuture submitCredentialsIfRequired( int prevId, Credentials credentials, URI callback) { if (prevId < 0) { - return submitCredentials(credentials, callback); + return queryExistingCredentials(callback) + .thenCompose( + id -> { + if (id >= 0) { + return CompletableFuture.completedFuture(id); + } + return submitCredentials(prevId, credentials, callback); + }); } HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH + "/" + prevId)); log.info("{}", req); return supply(req, (res) -> logResponse(req, res)) - .thenApply(this::isOkStatus) + .handle( + (v, t) -> { + if (t != null) { + log.error("Failed to get credentials with ID " + prevId, t); + throw new CompletionException(t); + } + return isOkStatus(v); + }) .thenCompose( exists -> { if (exists) { return CompletableFuture.completedFuture(prevId); } - return submitCredentials(credentials, callback); + return submitCredentials(prevId, credentials, callback); }); } - private CompletableFuture submitCredentials(Credentials credentials, URI callback) { + private CompletableFuture queryExistingCredentials(URI callback) { + HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH)); + log.info("{}", req); + return supply(req, (res) -> logResponse(req, res)) + .handle( + (res, t) -> { + if (t != null) { + log.error("Failed to get credentials", t); + throw new CompletionException(t); + } + return assertOkStatus(req, res); + }) + .thenApply( + res -> { + try (InputStream is = res.getEntity().getContent()) { + return mapper.readValue(is, ObjectNode.class); + } catch (IOException e) { + log.error("Unable to parse response as JSON", e); + throw new RegistrationException(e); + } + }) + .thenApply( + node -> { + try { + return mapper.readValue( + node.get("data").get("result").toString(), + new TypeReference>() {}); + } catch (IOException e) { + log.error("Unable to parse response as JSON", e); + throw new RegistrationException(e); + } + }) + .thenApply( + l -> + l.stream() + .filter( + sc -> + Objects.equals( + sc.matchExpression, + selfMatchExpression(callback))) + .map(sc -> sc.id) + .findFirst() + .orElse(-1)); + } + + private CompletableFuture submitCredentials( + int prevId, Credentials credentials, URI callback) { HttpPost req = new HttpPost(baseUri.resolve(CREDENTIALS_API_PATH)); MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create() @@ -198,10 +277,38 @@ private CompletableFuture submitCredentials(Credentials credentials, UR log.info("{}", req); req.setEntity(entityBuilder.build()); return supply(req, (res) -> logResponse(req, res)) - .thenApply(res -> assertOkStatus(req, res)) - .thenApply(res -> res.getFirstHeader(HttpHeaders.LOCATION).getValue()) - .thenApply(res -> res.substring(res.lastIndexOf('/') + 1, res.length())) - .thenApply(Integer::valueOf); + .thenApply( + res -> { + if (!isOkStatus(res)) { + try { + if (res.getStatusLine().getStatusCode() == 409) { + int queried = queryExistingCredentials(callback).get(); + if (queried >= 0) { + return queried; + } + } + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to query for existing credentials", e); + } + try { + deleteCredentials(prevId).get(); + } catch (InterruptedException | ExecutionException e) { + log.error( + "Failed to delete previous credentials with id " + + prevId, + e); + throw new RegistrationException(e); + } + } + String location = + assertOkStatus(req, res) + .getFirstHeader(HttpHeaders.LOCATION) + .getValue(); + String id = + location.substring( + location.lastIndexOf('/') + 1, location.length()); + return Integer.valueOf(id); + }); } public CompletableFuture deleteCredentials(int id) { @@ -210,9 +317,7 @@ public CompletableFuture deleteCredentials(int id) { } HttpDelete req = new HttpDelete(baseUri.resolve(CREDENTIALS_API_PATH + "/" + id)); log.info("{}", req); - return supply(req, (res) -> logResponse(req, res)) - .thenApply(res -> assertOkStatus(req, res)) - .thenApply(res -> null); + return supply(req, (res) -> logResponse(req, res)).thenApply(res -> null); } public CompletableFuture deregister(PluginInfo pluginInfo) { @@ -339,14 +444,15 @@ private CountingInputStream getRecordingInputStream(Path filePath) throws IOExce private String selfMatchExpression(URI callback) { return String.format( - "target.connectUrl == \"%s\" && target.jvmId == \"%s\" &&" - + " target.annotations.platform[\"INSTANCE_ID\"] == \"%s\"", - callback, jvmId, instanceId); + "target.connectUrl == \"%s\" && target.annotations.platform[\"INSTANCE_ID\"] ==" + + " \"%s\"", + callback, instanceId); } private boolean isOkStatus(HttpResponse res) { int sc = res.getStatusLine().getStatusCode(); - return 200 <= sc && sc < 300; + // 2xx is OK, 3xx is redirect range so allow those too + return 200 <= sc && sc < 400; } private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) { @@ -364,4 +470,36 @@ private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) { } return res; } + + @SuppressFBWarnings( + value = { + "URF_UNREAD_FIELD", + "UWF_UNWRITTEN_FIELD", + "UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD" + }) + public static class StoredCredential { + + public int id; + public String matchExpression; + + @Override + public int hashCode() { + return Objects.hash(id, matchExpression); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + StoredCredential other = (StoredCredential) obj; + return id == other.id && Objects.equals(matchExpression, other.matchExpression); + } + } } diff --git a/src/main/java/io/cryostat/agent/HttpException.java b/src/main/java/io/cryostat/agent/HttpException.java index abfc4563..8f90529d 100644 --- a/src/main/java/io/cryostat/agent/HttpException.java +++ b/src/main/java/io/cryostat/agent/HttpException.java @@ -24,4 +24,8 @@ public class HttpException extends RuntimeException { "Unexpected non-OK status code %d on API path %s", statusCode, uri.toString())); } + + HttpException(int statusCode, Throwable cause) { + super(String.format("HTTP %d", statusCode), cause); + } } diff --git a/src/main/java/io/cryostat/agent/MainModule.java b/src/main/java/io/cryostat/agent/MainModule.java index e963ab66..549996cb 100644 --- a/src/main/java/io/cryostat/agent/MainModule.java +++ b/src/main/java/io/cryostat/agent/MainModule.java @@ -41,10 +41,12 @@ import io.cryostat.core.sys.FileSystem; import io.cryostat.core.tui.ClientWriter; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import dagger.Lazy; import dagger.Module; import dagger.Provides; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.HttpClientBuilder; @@ -92,17 +94,9 @@ public static WebServer provideWebServer( @Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_HOST) String host, @Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_PORT) int port, @Named(ConfigModule.CRYOSTAT_AGENT_CALLBACK) URI callback, - Lazy registration, - @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs) { + Lazy registration) { return new WebServer( - remoteContexts, - cryostat, - executor, - host, - port, - callback, - registration, - registrationRetryMs); + remoteContexts, cryostat, executor, host, port, callback, registration); } @Provides @@ -170,7 +164,8 @@ public static HttpClient provideHttpClient( @Provides public static ObjectMapper provideObjectMapper() { - return new ObjectMapper(); + return new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @Provides @@ -205,8 +200,23 @@ public static Registration provideRegistration( @Named(ConfigModule.CRYOSTAT_AGENT_APP_JMX_PORT) int jmxPort, @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs, @Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CHECK_MS) int registrationCheckMs) { + + Logger log = LoggerFactory.getLogger(Registration.class); return new Registration( - executor, + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("cryostat-agent-registration"); + t.setUncaughtExceptionHandler( + (thread, err) -> + log.error( + String.format( + "[%s] Uncaught exception: %s", + thread.getName(), + ExceptionUtils.getStackTrace(err)))); + return t; + }), cryostat, callback, webServer, diff --git a/src/main/java/io/cryostat/agent/Registration.java b/src/main/java/io/cryostat/agent/Registration.java index 7240ecd7..9699af81 100644 --- a/src/main/java/io/cryostat/agent/Registration.java +++ b/src/main/java/io/cryostat/agent/Registration.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -96,10 +97,49 @@ void start() { this.addRegistrationListener( evt -> { switch (evt.state) { + case UNREGISTERED: + if (this.registrationCheckTask != null) { + this.registrationCheckTask.cancel(false); + this.registrationCheckTask = null; + } + executor.submit( + () -> { + try { + webServer + .generateCredentials() + .handle( + (v, t) -> { + if (t != null) { + executor.schedule( + () -> + notify( + RegistrationEvent + .State + .UNREGISTERED), + registrationRetryMs, + TimeUnit.MILLISECONDS); + log.error( + "Failed to generate" + + " credentials", + t); + throw new CompletionException( + t); + } + ; + notify( + RegistrationEvent.State + .REFRESHING); + return v; + }); + } catch (NoSuchAlgorithmException nsae) { + log.error("Could not regenerate credentials", nsae); + } + }); + break; case REGISTERED: if (this.registrationCheckTask != null) { log.warn("Re-registered without previous de-registration"); - this.registrationCheckTask.cancel(true); + this.registrationCheckTask.cancel(false); } this.registrationCheckTask = executor.scheduleAtFixedRate( @@ -132,43 +172,8 @@ void start() { registrationCheckMs, TimeUnit.MILLISECONDS); break; - case UNREGISTERED: - if (this.registrationCheckTask != null) { - this.registrationCheckTask.cancel(true); - this.registrationCheckTask = null; - } - executor.submit( - () -> { - try { - webServer - .generateCredentials() - .handle( - (t, v) -> { - if (t != null) { - log.warn( - "Failed to generate" - + " credentials", - t); - executor.schedule( - () -> - notify( - RegistrationEvent - .State - .UNREGISTERED), - registrationRetryMs, - TimeUnit.MILLISECONDS); - } else { - executor.submit( - this::tryRegister); - } - return null; - }); - } catch (NoSuchAlgorithmException nsae) { - log.error("Could not regenerate credentials", nsae); - } - }); - break; case REFRESHING: + executor.submit(this::tryRegister); break; case REFRESHED: break; @@ -187,16 +192,18 @@ void addRegistrationListener(Consumer listener) { } void tryRegister() { - notify(RegistrationEvent.State.REFRESHING); + int credentialId = webServer.getCredentialId(); + if (credentialId < 0) { + notify(RegistrationEvent.State.UNREGISTERED); + return; + } try { URI credentialedCallback = new URIBuilder(callback) - .setUserInfo( - "storedcredentials", - String.valueOf(webServer.getCredentialId())) + .setUserInfo("storedcredentials", String.valueOf(credentialId)) .build(); CompletableFuture f = - cryostat.register(pluginInfo, credentialedCallback) + cryostat.register(credentialId, pluginInfo, credentialedCallback) .handle( (plugin, t) -> { if (plugin != null) { @@ -211,6 +218,7 @@ void tryRegister() { tryUpdate(); } } else if (t != null) { + this.webServer.resetCredentialId(); this.pluginInfo.clear(); throw new RegistrationException(t); } @@ -307,7 +315,7 @@ CompletableFuture deregister() { return CompletableFuture.completedFuture(null); } return cryostat.deleteCredentials(webServer.getCredentialId()) - .thenCompose(v -> cryostat.deregister(pluginInfo)) + .handle((v, t) -> cryostat.deregister(pluginInfo)) .handle( (n, t) -> { if (t != null) { @@ -325,17 +333,17 @@ CompletableFuture deregister() { }); } - private void notify(RegistrationEvent.State state) { + public void notify(RegistrationEvent.State state) { RegistrationEvent evt = new RegistrationEvent(state); - this.listeners.forEach(listener -> listener.accept(evt)); + executor.submit(() -> this.listeners.forEach(listener -> listener.accept(evt))); } static class RegistrationEvent { enum State { + UNREGISTERED, REGISTERED, PUBLISHED, - UNREGISTERED, REFRESHING, REFRESHED, } diff --git a/src/main/java/io/cryostat/agent/WebServer.java b/src/main/java/io/cryostat/agent/WebServer.java index 7c349f4d..b352557f 100644 --- a/src/main/java/io/cryostat/agent/WebServer.java +++ b/src/main/java/io/cryostat/agent/WebServer.java @@ -29,8 +29,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.zip.DeflaterOutputStream; @@ -58,7 +58,6 @@ class WebServer { private final Credentials credentials; private final URI callback; private final Lazy registration; - private final int registrationRetryMs; private HttpServer http; private volatile int credentialId = -1; @@ -73,8 +72,7 @@ class WebServer { String host, int port, URI callback, - Lazy registration, - int registrationRetryMs) { + Lazy registration) { this.remoteContexts = remoteContexts; this.cryostat = cryostat; this.executor = executor; @@ -83,7 +81,6 @@ class WebServer { this.credentials = new Credentials(); this.callback = callback; this.registration = registration; - this.registrationRetryMs = registrationRetryMs; this.agentAuthenticator = new AgentAuthenticator(); this.requestLoggingFilter = new RequestLoggingFilter(); @@ -99,7 +96,7 @@ void start() throws IOException, NoSuchAlgorithmException { this.http.setExecutor(executor); Set mergedContexts = new HashSet<>(remoteContexts.get()); - mergedContexts.add(new PingContext()); + mergedContexts.add(new PingContext(registration)); mergedContexts.forEach( rc -> { HttpContext ctx = this.http.createContext(rc.path(), rc::handle); @@ -122,40 +119,40 @@ int getCredentialId() { return credentialId; } + void resetCredentialId() { + this.credentialId = -1; + } + CompletableFuture generateCredentials() throws NoSuchAlgorithmException { - synchronized (this.credentials) { - this.credentials.regenerate(); - return this.cryostat - .get() - .submitCredentialsIfRequired(this.credentialId, this.credentials, this.callback) - .handle( - (v, t) -> { - if (t != null) { - log.error("Could not submit credentials", t); - executor.schedule( - () -> { - try { - this.generateCredentials(); - } catch (NoSuchAlgorithmException e) { - log.error("Cannot submit credentials", e); - } - }, - registrationRetryMs, - TimeUnit.MILLISECONDS); - } - return v; - }) - .thenAccept( - i -> { - this.credentialId = i; - log.info("Defined credentials with id {}", i); - }) - .thenRun(this.credentials::clear); - } + this.credentials.regenerate(); + return this.cryostat + .get() + .submitCredentialsIfRequired(this.credentialId, this.credentials, this.callback) + .handle( + (v, t) -> { + this.credentials.clear(); + if (t != null) { + this.resetCredentialId(); + log.error("Could not submit credentials", t); + throw new CompletionException("Could not submit credentials", t); + } + return v; + }) + .thenAccept( + i -> { + this.credentialId = i; + log.info("Defined credentials with id {}", i); + }); } private class PingContext implements RemoteContext { + private final Lazy registration; + + PingContext(Lazy registration) { + this.registration = registration; + } + @Override public String path() { return "/"; @@ -167,9 +164,11 @@ public void handle(HttpExchange exchange) throws IOException { switch (mtd) { case "POST": synchronized (WebServer.this.credentials) { - executor.execute(registration.get()::tryRegister); exchange.sendResponseHeaders(HttpStatus.SC_NO_CONTENT, -1); exchange.close(); + this.registration + .get() + .notify(Registration.RegistrationEvent.State.REFRESHING); } break; case "GET":