Skip to content

Commit

Permalink
adds a config property to control the use of http watches
Browse files Browse the repository at this point in the history
Closes #4624
  • Loading branch information
shawkins authored and manusa committed Sep 21, 2023
1 parent ba7d7ba commit 783fafb
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Fix #5432: [java-generator] Add the possibility to always emit `additionalProperties` on generated POJOs
* Fix #5368: added support for additional ListOptions fields
* Fix #5377: added a createOr and unlock function to provide a straight-forward replacement for createOrReplace.
* Fix #4624: added Config.onlyHttpWatches to control whether watches should only use regular HTTP requests, and not attempt WebSocket connections.
* Fix #5388: [crd-generator] Generate deterministic CRDs
* Fix #5135: added per instance methods to create a builder - instead on new PodBuilder(pod), you may use pod.toBuilder()
* Fix #5257: Add ErrorStreamMessage and StatusStreamMessage to ease mocking of pods/exec requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ public class Config {
private String userAgent = "fabric8-kubernetes-client/" + Version.clientVersion();
private TlsVersion[] tlsVersions = new TlsVersion[] { TlsVersion.TLS_1_3, TlsVersion.TLS_1_2 };

private boolean onlyHttpWatches;

/**
* @deprecated Use Kubernetes Status directly for extracting error messages.
*/
Expand Down Expand Up @@ -337,7 +339,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
errorMessages, userAgent, tlsVersions, websocketPingInterval, proxyUsername, proxyPassword,
trustStoreFile, trustStorePassphrase, keyStoreFile, keyStorePassphrase, impersonateUsername, impersonateGroups,
impersonateExtras, null, null, DEFAULT_REQUEST_RETRY_BACKOFFLIMIT, DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL,
DEFAULT_UPLOAD_REQUEST_TIMEOUT);
DEFAULT_UPLOAD_REQUEST_TIMEOUT, false);
}

@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder", editableEnabled = false)
Expand All @@ -352,7 +354,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
String proxyPassword, String trustStoreFile, String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase,
String impersonateUsername, String[] impersonateGroups, Map<String, List<String>> impersonateExtras,
OAuthTokenProvider oauthTokenProvider, Map<String, String> customHeaders, int requestRetryBackoffLimit,
int requestRetryBackoffInterval, int uploadRequestTimeout) {
int requestRetryBackoffInterval, int uploadRequestTimeout, boolean onlyHttpWatches) {
this.apiVersion = apiVersion;
this.namespace = namespace;
this.trustCerts = trustCerts;
Expand Down Expand Up @@ -401,6 +403,7 @@ public Config(String masterUrl, String apiVersion, String namespace, boolean tru
this.maxConcurrentRequests = maxConcurrentRequests;
this.maxConcurrentRequestsPerHost = maxConcurrentRequestsPerHost;
this.autoOAuthToken = autoOAuthToken;
this.onlyHttpWatches = onlyHttpWatches;
}

public static void configFromSysPropsOrEnvVars(Config config) {
Expand Down Expand Up @@ -1509,4 +1512,12 @@ public void setAutoOAuthToken(String autoOAuthToken) {
this.autoOAuthToken = autoOAuthToken;
}

public boolean isOnlyHttpWatches() {
return onlyHttpWatches;
}

public void setOnlyHttpWatches(boolean onlyHttpWatches) {
this.onlyHttpWatches = onlyHttpWatches;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper;
import io.fabric8.kubernetes.client.utils.internal.WatcherToggle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -100,6 +102,8 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
ExtensibleResource<T>,
ListerWatcher<T, L> {

static final Logger LOGGER = LoggerFactory.getLogger(BaseOperation.class);

private static final String WATCH = "watch";
private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources";
private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources";
Expand Down Expand Up @@ -640,9 +644,12 @@ public Watch watch(ListOptions options, final Watcher<T> watcher) {

@Override
public CompletableFuture<AbstractWatchManager<T>> submitWatch(ListOptions options, final Watcher<T> watcher) {
WatcherToggle<T> watcherToggle = new WatcherToggle<>(watcher, true);
ListOptions optionsToUse = defaultListOptions(options, true);
WatchConnectionManager<T, L> watch;
if (this.getConfig().isOnlyHttpWatches()) {
return CompletableFuture.completedFuture(httpWatch(watcher, optionsToUse));
}
WatcherToggle<T> watcherToggle = new WatcherToggle<>(watcher, true);
try {
watch = new WatchConnectionManager<>(
httpClient,
Expand All @@ -661,29 +668,32 @@ public CompletableFuture<AbstractWatchManager<T>> submitWatch(ListOptions option
if (t instanceof CompletionException) {
t = t.getCause();
}
boolean httpWatch = false;
if (t instanceof KubernetesClientException) {
KubernetesClientException ke = (KubernetesClientException) t;
// 503 will initially trigger re-tries, if it's "expected", we may need to short-circuit that
List<Integer> furtherProcessedCodes = Arrays.asList(200, 503);
if (furtherProcessedCodes.contains(ke.getCode())) {
//release the watch after disabling the watcher (to avoid premature call to onClose)
watcherToggle.disable();

// If the HTTP return code is 200 or 503, we retry the watch again using a persistent hanging
// HTTP GET. This is meant to handle cases like kubectl local proxy which does not support
// websockets. Issue: https://github.com/kubernetes/kubernetes/issues/25126
try {
return new WatchHTTPManager<>(
httpClient,
this,
optionsToUse,
watcher,
getRequestConfig().getWatchReconnectInterval(),
getRequestConfig().getWatchReconnectLimit());
} catch (MalformedURLException e) {
throw KubernetesClientException.launderThrowable(forOperationType(WATCH), e);
}
LOGGER.debug(
"Websocket hanshake failed with code {}, but an httpwatch may be possible. Use Config.onlyHttpWatches to disable websocket watches.",
ke.getCode());
httpWatch = true;
}
} else {
LOGGER.debug(
"Failed to establish a websocket watch, will try regular http instead. Use Config.onlyHttpWatches to disable websocket watches.",
t);
httpWatch = true;
}
if (httpWatch) {
//release the watch after disabling the watcher (to avoid premature call to onClose)
watcherToggle.disable();
return httpWatch(watcher, optionsToUse);
}

throw KubernetesClientException.launderThrowable(t);
} finally {
watch.close();
Expand All @@ -694,6 +704,20 @@ public CompletableFuture<AbstractWatchManager<T>> submitWatch(ListOptions option

}

private AbstractWatchManager<T> httpWatch(final Watcher<T> watcher, ListOptions optionsToUse) {
try {
return new WatchHTTPManager<>(
httpClient,
this,
optionsToUse,
watcher,
getRequestConfig().getWatchReconnectInterval(),
getRequestConfig().getWatchReconnectLimit());
} catch (MalformedURLException e) {
throw KubernetesClientException.launderThrowable(forOperationType(WATCH), e);
}
}

@Override
public T replace() {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,39 @@ public void onClose(WatcherException cause) {
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
void testOnlyHttpWatch() throws InterruptedException {
// Given

String dummyEvent = Serialization.asJson(new WatchEventBuilder().withType("MODIFIED")
.withObject(new PodBuilder().withNewMetadata().endMetadata().build())
.build()) + "\n";

server.expect()
.withPath("/api/v1/namespaces/test/pods?allowWatchBookmarks=true&watch=true")
.andReturn(200, dummyEvent)
.once();

CountDownLatch latch = new CountDownLatch(1);

client.getConfiguration().setOnlyHttpWatches(true);

client.pods().watch(new Watcher<Pod>() {

@Override
public void eventReceived(Action action, Pod resource) {
latch.countDown();
}

@Override
public void onClose(WatcherException cause) {
}
});

// ensure that the exception does not inhibit further message processing
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
void testErrorAfterReady() throws InterruptedException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public OpenShiftConfig(String openShiftUrl, String oapiVersion, String masterUrl
String trustStorePassphrase, String keyStoreFile, String keyStorePassphrase, String impersonateUsername,
String[] impersonateGroups, Map<String, List<String>> impersonateExtras, OAuthTokenProvider oauthTokenProvider,
Map<String, String> customHeaders, int requestRetryBackoffLimit, int requestRetryBackoffInterval,
int uploadRequestTimeout, long buildTimeout,
int uploadRequestTimeout, boolean onlyHttpWatches, long buildTimeout,
boolean disableApiGroupCheck) {
super(masterUrl, apiVersion, namespace, trustCerts, disableHostnameVerification, caCertFile, caCertData,
clientCertFile,
Expand All @@ -100,7 +100,7 @@ public OpenShiftConfig(String openShiftUrl, String oapiVersion, String masterUrl
impersonateExtras, oauthTokenProvider, customHeaders,
requestRetryBackoffLimit,
requestRetryBackoffInterval,
uploadRequestTimeout);
uploadRequestTimeout, onlyHttpWatches);
this.setOapiVersion(oapiVersion);
this.setBuildTimeout(buildTimeout);
this.setDisableApiGroupCheck(disableApiGroupCheck);
Expand Down Expand Up @@ -141,6 +141,7 @@ public OpenShiftConfig(Config kubernetesConfig, String openShiftUrl, String oapi
kubernetesConfig.getOauthTokenProvider(), kubernetesConfig.getCustomHeaders(),
kubernetesConfig.getRequestRetryBackoffLimit(), kubernetesConfig.getRequestRetryBackoffInterval(),
kubernetesConfig.getUploadRequestTimeout(),
kubernetesConfig.isOnlyHttpWatches(),
buildTimeout,
false);
}
Expand Down

0 comments on commit 783fafb

Please sign in to comment.