From 783fafb97da09ce34267096621d84eab2a00c2be Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 14 Sep 2023 07:16:29 -0400 Subject: [PATCH] adds a config property to control the use of http watches Closes #4624 --- CHANGELOG.md | 1 + .../io/fabric8/kubernetes/client/Config.java | 15 +++++- .../client/dsl/internal/BaseOperation.java | 54 +++++++++++++------ .../kubernetes/client/mock/WatchTest.java | 33 ++++++++++++ .../openshift/client/OpenShiftConfig.java | 5 +- 5 files changed, 89 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e092c05bdda..60f80987a0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Fix #5432: [java-generator] Add the possibility to always emit `additionalProperties` on generated POJOs * Fix #5368: added support for additional ListOptions fields * Fix #5377: added a createOr and unlock function to provide a straight-forward replacement for createOrReplace. +* Fix #4624: added Config.onlyHttpWatches to control whether watches should only use regular HTTP requests, and not attempt WebSocket connections. * Fix #5388: [crd-generator] Generate deterministic CRDs * Fix #5135: added per instance methods to create a builder - instead on new PodBuilder(pod), you may use pod.toBuilder() * Fix #5257: Add ErrorStreamMessage and StatusStreamMessage to ease mocking of pods/exec requests diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java index 14a8a43482a..89ce3f8b599 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java @@ -220,6 +220,8 @@ public class Config { private String userAgent = "fabric8-kubernetes-client/" + Version.clientVersion(); private TlsVersion[] tlsVersions = new TlsVersion[] { TlsVersion.TLS_1_3, TlsVersion.TLS_1_2 }; + private boolean onlyHttpWatches; + /** * @deprecated Use Kubernetes Status directly for extracting error messages. */ @@ -337,7 +339,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru errorMessages, userAgent, tlsVersions, websocketPingInterval, proxyUsername, proxyPassword, trustStoreFile, trustStorePassphrase, keyStoreFile, keyStorePassphrase, impersonateUsername, impersonateGroups, impersonateExtras, null, null, DEFAULT_REQUEST_RETRY_BACKOFFLIMIT, DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL, - DEFAULT_UPLOAD_REQUEST_TIMEOUT); + DEFAULT_UPLOAD_REQUEST_TIMEOUT, false); } @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder", editableEnabled = false) @@ -352,7 +354,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru String proxyPassword, String trustStoreFile, String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase, String impersonateUsername, String[] impersonateGroups, Map> impersonateExtras, OAuthTokenProvider oauthTokenProvider, Map customHeaders, int requestRetryBackoffLimit, - int requestRetryBackoffInterval, int uploadRequestTimeout) { + int requestRetryBackoffInterval, int uploadRequestTimeout, boolean onlyHttpWatches) { this.apiVersion = apiVersion; this.namespace = namespace; this.trustCerts = trustCerts; @@ -401,6 +403,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru this.maxConcurrentRequests = maxConcurrentRequests; this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost; this.autoOAuthToken = autoOAuthToken; + this.onlyHttpWatches = onlyHttpWatches; } public static void configFromSysPropsOrEnvVars(Config config) { @@ -1509,4 +1512,12 @@ public void setAutoOAuthToken(String autoOAuthToken) { this.autoOAuthToken = autoOAuthToken; } + public boolean isOnlyHttpWatches() { + return onlyHttpWatches; + } + + public void setOnlyHttpWatches(boolean onlyHttpWatches) { + this.onlyHttpWatches = onlyHttpWatches; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java index 8671305bea3..9106b39a09b 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java @@ -64,6 +64,8 @@ import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper; import io.fabric8.kubernetes.client.utils.internal.WatcherToggle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -100,6 +102,8 @@ public class BaseOperation, ListerWatcher { + static final Logger LOGGER = LoggerFactory.getLogger(BaseOperation.class); + private static final String WATCH = "watch"; private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources"; private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources"; @@ -640,9 +644,12 @@ public Watch watch(ListOptions options, final Watcher watcher) { @Override public CompletableFuture> submitWatch(ListOptions options, final Watcher watcher) { - WatcherToggle watcherToggle = new WatcherToggle<>(watcher, true); ListOptions optionsToUse = defaultListOptions(options, true); WatchConnectionManager watch; + if (this.getConfig().isOnlyHttpWatches()) { + return CompletableFuture.completedFuture(httpWatch(watcher, optionsToUse)); + } + WatcherToggle watcherToggle = new WatcherToggle<>(watcher, true); try { watch = new WatchConnectionManager<>( httpClient, @@ -661,29 +668,32 @@ public CompletableFuture> submitWatch(ListOptions option if (t instanceof CompletionException) { t = t.getCause(); } + boolean httpWatch = false; if (t instanceof KubernetesClientException) { KubernetesClientException ke = (KubernetesClientException) t; + // 503 will initially trigger re-tries, if it's "expected", we may need to short-circuit that List furtherProcessedCodes = Arrays.asList(200, 503); if (furtherProcessedCodes.contains(ke.getCode())) { - //release the watch after disabling the watcher (to avoid premature call to onClose) - watcherToggle.disable(); - // If the HTTP return code is 200 or 503, we retry the watch again using a persistent hanging // HTTP GET. This is meant to handle cases like kubectl local proxy which does not support // websockets. Issue: https://github.com/kubernetes/kubernetes/issues/25126 - try { - return new WatchHTTPManager<>( - httpClient, - this, - optionsToUse, - watcher, - getRequestConfig().getWatchReconnectInterval(), - getRequestConfig().getWatchReconnectLimit()); - } catch (MalformedURLException e) { - throw KubernetesClientException.launderThrowable(forOperationType(WATCH), e); - } + LOGGER.debug( + "Websocket hanshake failed with code {}, but an httpwatch may be possible. Use Config.onlyHttpWatches to disable websocket watches.", + ke.getCode()); + httpWatch = true; } + } else { + LOGGER.debug( + "Failed to establish a websocket watch, will try regular http instead. Use Config.onlyHttpWatches to disable websocket watches.", + t); + httpWatch = true; } + if (httpWatch) { + //release the watch after disabling the watcher (to avoid premature call to onClose) + watcherToggle.disable(); + return httpWatch(watcher, optionsToUse); + } + throw KubernetesClientException.launderThrowable(t); } finally { watch.close(); @@ -694,6 +704,20 @@ public CompletableFuture> submitWatch(ListOptions option } + private AbstractWatchManager httpWatch(final Watcher watcher, ListOptions optionsToUse) { + try { + return new WatchHTTPManager<>( + httpClient, + this, + optionsToUse, + watcher, + getRequestConfig().getWatchReconnectInterval(), + getRequestConfig().getWatchReconnectLimit()); + } catch (MalformedURLException e) { + throw KubernetesClientException.launderThrowable(forOperationType(WATCH), e); + } + } + @Override public T replace() { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 68466ee5c40..c9f89f8abed 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -499,6 +499,39 @@ public void onClose(WatcherException cause) { assertTrue(latch.await(10, TimeUnit.SECONDS)); } + @Test + void testOnlyHttpWatch() throws InterruptedException { + // Given + + String dummyEvent = Serialization.asJson(new WatchEventBuilder().withType("MODIFIED") + .withObject(new PodBuilder().withNewMetadata().endMetadata().build()) + .build()) + "\n"; + + server.expect() + .withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true") + .andReturn(200, dummyEvent) + .once(); + + CountDownLatch latch = new CountDownLatch(1); + + client.getConfiguration().setOnlyHttpWatches(true); + + client.pods().watch(new Watcher() { + + @Override + public void eventReceived(Action action, Pod resource) { + latch.countDown(); + } + + @Override + public void onClose(WatcherException cause) { + } + }); + + // ensure that the exception does not inhibit further message processing + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + @Test void testErrorAfterReady() throws InterruptedException { // Given diff --git a/openshift-client-api/src/main/java/io/fabric8/openshift/client/OpenShiftConfig.java b/openshift-client-api/src/main/java/io/fabric8/openshift/client/OpenShiftConfig.java index c35e17ad42f..fcaa95b0669 100644 --- a/openshift-client-api/src/main/java/io/fabric8/openshift/client/OpenShiftConfig.java +++ b/openshift-client-api/src/main/java/io/fabric8/openshift/client/OpenShiftConfig.java @@ -86,7 +86,7 @@ public OpenShiftConfig(String openShiftUrl, String oapiVersion, String masterUrl String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase, String impersonateUsername, String[] impersonateGroups, Map> impersonateExtras, OAuthTokenProvider oauthTokenProvider, Map customHeaders, int requestRetryBackoffLimit, int requestRetryBackoffInterval, - int uploadRequestTimeout, long buildTimeout, + int uploadRequestTimeout, boolean onlyHttpWatches, long buildTimeout, boolean disableApiGroupCheck) { super(masterUrl, apiVersion, namespace, trustCerts, disableHostnameVerification, caCertFile, caCertData, clientCertFile, @@ -100,7 +100,7 @@ public OpenShiftConfig(String openShiftUrl, String oapiVersion, String masterUrl impersonateExtras, oauthTokenProvider, customHeaders, requestRetryBackoffLimit, requestRetryBackoffInterval, - uploadRequestTimeout); + uploadRequestTimeout, onlyHttpWatches); this.setOapiVersion(oapiVersion); this.setBuildTimeout(buildTimeout); this.setDisableApiGroupCheck(disableApiGroupCheck); @@ -141,6 +141,7 @@ public OpenShiftConfig(Config kubernetesConfig, String openShiftUrl, String oapi kubernetesConfig.getOauthTokenProvider(), kubernetesConfig.getCustomHeaders(), kubernetesConfig.getRequestRetryBackoffLimit(), kubernetesConfig.getRequestRetryBackoffInterval(), kubernetesConfig.getUploadRequestTimeout(), + kubernetesConfig.isOnlyHttpWatches(), buildTimeout, false); }