Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix waitForSidecar to respect timeout. #1146

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ConfigurationClientIT extends BaseIT {
public static void init() throws Exception {
daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000);
daprClient = daprRun.newDaprClientBuilder().build();
daprClient.waitForSidecar(10000).block();
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.resiliency;

import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.ToxiProxyRun;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test SDK resiliency.
*/
public class WaitForSidecarIT extends BaseIT {

// Use a number large enough to make sure it will respect the entire timeout.
private static final Duration LATENCY = Duration.ofSeconds(5);

private static final Duration JITTER = Duration.ofSeconds(0);

private static DaprRun daprRun;

private static ToxiProxyRun toxiProxyRun;

private static DaprRun daprNotRunning;

@BeforeAll
public static void init() throws Exception {
daprRun = startDaprApp(WaitForSidecarIT.class.getSimpleName(), 5000);
daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName()+"NotRunning", 5000);
daprNotRunning.stop();

toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER);
toxiProxyRun.start();
}

@Test
public void waitSucceeds() throws Exception {
try(var client = daprRun.newDaprClient()) {
client.waitForSidecar(5000).block();
}
}

@Test
public void waitTimeout() {
int timeoutInMillis = (int)LATENCY.minusMillis(100).toMillis();
long started = System.currentTimeMillis();
assertThrows(RuntimeException.class, () -> {
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutInMillis).block();
}
});
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= timeoutInMillis);
}

@Test
public void waitSlow() throws Exception {
int timeoutInMillis = (int)LATENCY.plusMillis(100).toMillis();
long started = System.currentTimeMillis();
try(var client = toxiProxyRun.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutInMillis).block();
}
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= LATENCY.toMillis());
}

@Test
public void waitNotRunningTimeout() {
// Does not make this number too smaller since bug does not repro when <= 2.5s.
// This has to do with a previous bug in the implementation.
int timeoutMilliseconds = 5000;
long started = System.currentTimeMillis();
assertThrows(RuntimeException.class, () -> {
try(var client = daprNotRunning.newDaprClientBuilder().build()) {
client.waitForSidecar(timeoutMilliseconds).block();
}
});
long duration = System.currentTimeMillis() - started;
assertTrue(duration >= timeoutMilliseconds);
}
}
56 changes: 14 additions & 42 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -97,7 +99,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -114,6 +115,8 @@
*/
public class DaprClientImpl extends AbstractDaprClient {

private final Logger logger;

/**
* The GRPC managed channel to be used.
*/
Expand Down Expand Up @@ -235,6 +238,7 @@ private DaprClientImpl(
this.httpClient = httpClient;
this.retryPolicy = retryPolicy;
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy);
this.logger = LoggerFactory.getLogger(DaprClientImpl.class);
}

private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
Expand Down Expand Up @@ -273,53 +277,21 @@ public <T extends AbstractStub<T>> T newGrpcStub(String appId, Function<Channel,
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"};
int maxRetries = 5;

Retry retrySpec = Retry
.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry(retrySignal -> {
System.out.println("Retrying component health check...");
});

/*
NOTE: (Cassie) Uncomment this once it actually gets implemented:
https://github.com/grpc/grpc-java/issues/4359

int maxChannelStateRetries = 5;

// Retry logic for checking the channel state
Retry channelStateRetrySpec = Retry
.fixedDelay(maxChannelStateRetries, Duration.ofMillis(500))
.doBeforeRetry(retrySignal -> {
System.out.println("Retrying channel state check...");
});
*/

// Do the Dapr Http endpoint check to have parity with Dotnet
Mono<DaprHttp.Response> responseMono = this.httpClient.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments,
null, "", null, null);

return responseMono
.retryWhen(retrySpec)
/*
NOTE: (Cassie) Uncomment this once it actually gets implemented:
https://github.com/grpc/grpc-java/issues/4359
.flatMap(response -> {
// Check the status code
int statusCode = response.getStatusCode();

// Check if the channel's state is READY
return Mono.defer(() -> {
if (this.channel.getState(true) == ConnectivityState.READY) {
// Return true if the status code is in the 2xx range
if (statusCode >= 200 && statusCode < 300) {
return Mono.empty(); // Continue with the flow
}
}
return Mono.error(new RuntimeException("Health check failed"));
}).retryWhen(channelStateRetrySpec);
})
*/
// No method to "retry forever every 500ms", so we make it practically forever.
// 9223372036854775807 * 500 ms = 1.46235604 x 10^11 years
// If anyone needs to wait for the sidecar for longer than that, sorry.
.retryWhen(
Retry
.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(500))
.doBeforeRetry(s -> {
this.logger.info("Retrying sidecar health check ...");
}))
.timeout(Duration.ofMillis(timeoutInMilliseconds))
.onErrorResume(DaprException.class, e ->
Mono.error(new RuntimeException(e)))
Expand Down
11 changes: 3 additions & 8 deletions sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,11 @@ public void waitForSidecarBadHealthCheck() throws Exception {
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));

// retry the max allowed retries (5 times)
// it will timeout.
StepVerifier.create(daprClientHttp.waitForSidecar(5000))
.expectSubscription()
.expectErrorMatches(throwable -> {
if (throwable instanceof RuntimeException) {
return "Retries exhausted: 5/5".equals(throwable.getMessage());
}
return false;
})
.verify(Duration.ofSeconds(20));
.expectError()
.verify(Duration.ofMillis(6000));
}

@Test
Expand Down
Loading