diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index 80d86d0137..2b957110f7 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -15,6 +15,7 @@ import io.dapr.client.DaprApiProtocol; import io.dapr.client.DaprHttpBuilder; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.utils.Version; import io.dapr.v1.DaprGrpc; @@ -46,26 +47,38 @@ public class ActorClient implements AutoCloseable { * Instantiates a new channel for Dapr sidecar communication. */ public ActorClient() { - this(Properties.API_PROTOCOL.get()); + this(null); + } + + /** + * Instantiates a new channel for Dapr sidecar communication. + * + * @param resiliencyOptions Client resiliency options. + */ + public ActorClient(ResiliencyOptions resiliencyOptions) { + this(Properties.API_PROTOCOL.get(), resiliencyOptions); } /** * Instantiates a new channel for Dapr sidecar communication. * * @param apiProtocol Dapr's API protocol. + * @param resiliencyOptions Client resiliency options. */ - private ActorClient(DaprApiProtocol apiProtocol) { - this(apiProtocol, buildManagedChannel(apiProtocol)); + private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) { + this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions); } /** * Instantiates a new channel for Dapr sidecar communication. * * @param apiProtocol Dapr's API protocol. + * @param grpcManagedChannel gRPC channel. + * @param resiliencyOptions Client resiliency options. */ - private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) { + private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel, ResiliencyOptions resiliencyOptions) { this.grpcManagedChannel = grpcManagedChannel; - this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel); + this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions); } /** @@ -119,9 +132,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) { * @return an instance of the setup Client * @throws java.lang.IllegalStateException if any required field is missing */ - private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) { + private static DaprClient buildDaprClient( + DaprApiProtocol apiProtocol, + Channel grpcManagedChannel, + ResiliencyOptions resiliencyOptions) { switch (apiProtocol) { - case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel)); + case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions); case HTTP: { LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10."); return new DaprHttpClient(new DaprHttpBuilder().build()); diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index 844d2ace36..cd56041ed4 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -14,9 +14,12 @@ package io.dapr.actors.client; import com.google.protobuf.ByteString; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; +import io.dapr.internal.resiliency.RetryPolicy; +import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.CallOptions; @@ -31,6 +34,7 @@ import reactor.core.publisher.MonoSink; import reactor.util.context.ContextView; +import java.sql.Time; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -39,18 +43,33 @@ */ class DaprGrpcClient implements DaprClient { + /** + * Timeout policy for SDK calls to Dapr API. + */ + private final TimeoutPolicy timeoutPolicy; + + /** + * Retry policy for SDK calls to Dapr API. + */ + private final RetryPolicy retryPolicy; + /** * The async gRPC stub. */ - private DaprGrpc.DaprStub client; + private final DaprGrpc.DaprStub client; /** * Internal constructor. * * @param grpcClient Dapr's GRPC client. + * @param resiliencyOptions Client resiliency options (optional) */ - DaprGrpcClient(DaprGrpc.DaprStub grpcClient) { + DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) { this.client = intercept(grpcClient); + this.timeoutPolicy = new TimeoutPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + this.retryPolicy = new RetryPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } /** @@ -78,14 +97,14 @@ public Mono invoke(String actorType, String actorId, String methodName, * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { + private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, - CallOptions callOptions, + CallOptions options, Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { @@ -114,7 +133,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu } private Mono createMono(Consumer> consumer) { - return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private StreamObserver createStreamObserver(MonoSink sink) { diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index f3d42c8c2a..71360dc16e 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -105,7 +105,7 @@ public void setup() throws IOException { InProcessChannelBuilder.forName(serverName).directExecutor().build()); // Create a HelloWorldClient using the in-process channel; - client = new DaprGrpcClient(DaprGrpc.newStub(channel)); + client = new DaprGrpcClient(DaprGrpc.newStub(channel), null); } @Test diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 25b96f39f9..7238dfef67 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -146,6 +146,11 @@ 1.3.5 compile + + eu.rekawek.toxiproxy + toxiproxy-java + 2.1.7 + diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java index be81cb7456..5a24601b1d 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java @@ -13,6 +13,8 @@ package io.dapr.it; +import io.dapr.config.Properties; + import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; @@ -46,6 +48,20 @@ public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPor } } + public void use() { + if (this.httpPort != null) { + System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.httpPort)); + System.getProperties().setProperty( + Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.httpPort); + } + + if (this.grpcPort != null) { + System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort)); + System.getProperties().setProperty( + Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort); + } + } + public Integer getGrpcPort() { return grpcPort; } diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 36ce771829..6af7282326 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -130,20 +130,11 @@ public void stop() throws InterruptedException, IOException { } public void use() { - if (this.ports.getHttpPort() != null) { - System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort())); - } - if (this.ports.getGrpcPort() != null) { - System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort())); - } + this.ports.use(); System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); System.getProperties().setProperty( Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); - System.getProperties().setProperty( - Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort()); - System.getProperties().setProperty( - Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort()); } public void switchToGRPC() { @@ -165,15 +156,15 @@ public void switchToProtocol(DaprApiProtocol protocol) { System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name()); } - public int getGrpcPort() { + public Integer getGrpcPort() { return ports.getGrpcPort(); } - public int getHttpPort() { + public Integer getHttpPort() { return ports.getHttpPort(); } - public int getAppPort() { + public Integer getAppPort() { return ports.getAppPort(); } diff --git a/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java new file mode 100644 index 0000000000..ad572308f4 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import eu.rekawek.toxiproxy.model.ToxicDirection; + +import java.io.IOException; +import java.time.Duration; + + +public class ToxiProxyRun implements Stoppable { + + private final DaprRun daprRun; + + private final Duration latency; + + private final Duration jitter; + + private Command toxiProxyServer; + + private ToxiproxyClient toxiproxyClient; + + private Proxy grpcProxy; + + private Proxy httpProxy; + + private DaprPorts toxiProxyPorts; + + public ToxiProxyRun(DaprRun run, Duration latency, Duration jitter) { + this.daprRun = run; + this.latency = latency; + this.jitter = jitter; + this.toxiProxyPorts = DaprPorts.build(true, true, true); + // artursouza: we use the "appPort" for the ToxiProxy server. + // artursouza: this oneliner is much easier than using TestContainers (sorry). + this.toxiProxyServer = new Command( + "Starting HTTP server on endpoint", + "docker run --rm --net=host --entrypoint=\"/toxiproxy\" -it ghcr.io/shopify/toxiproxy --port " + + this.toxiProxyPorts.getAppPort()); + } + + public void start() throws IOException, InterruptedException { + this.toxiProxyServer.run(); + this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort()); + + if (this.daprRun.getGrpcPort() != null) { + this.grpcProxy = toxiproxyClient.createProxy( + "daprd_grpc", + "127.0.0.1:" + this.toxiProxyPorts.getGrpcPort(), + "127.0.0.1:" + this.daprRun.getGrpcPort()); + this.grpcProxy.toxics() + .latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis()) + .setJitter(this.jitter.toMillis()); + } + + if (this.daprRun.getHttpPort() != null) { + this.httpProxy = toxiproxyClient.createProxy( + "daprd_http", + "127.0.0.1:" + this.toxiProxyPorts.getHttpPort(), + "127.0.0.1:" + this.daprRun.getHttpPort()); + this.httpProxy.toxics() + .latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis()) + .setJitter(this.jitter.toMillis()); + } + } + + public void use() { + this.toxiProxyPorts.use(); + } + + @Override + public void stop() throws InterruptedException, IOException { + this.toxiProxyServer.stop(); + this.toxiproxyClient = null; + this.grpcProxy = null; + this.httpProxy = null; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java new file mode 100644 index 0000000000..47651c942a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java @@ -0,0 +1,146 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.resiliency; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprClientGrpc; +import io.dapr.client.resiliency.ResiliencyOptions; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.ToxiProxyRun; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test SDK resiliency. + */ +public class SdkResiliencytIT extends BaseIT { + + private static final int NUM_ITERATIONS = 20; + + private static final Duration TIMEOUT = Duration.ofMillis(100); + + private static final Duration LATENCY = TIMEOUT.dividedBy(2); + + private static final Duration JITTER = TIMEOUT.multipliedBy(2); + + private static final int MAX_RETRIES = -1; // Infinity + + private static DaprRun daprRun; + + private static DaprClient daprClient; + + private static ToxiProxyRun toxiProxyRun; + + private static DaprClient daprToxiClient; + + private static DaprClient daprResilientClient; + + private static DaprClient daprRetriesOnceClient; + + private final String randomStateKeyPrefix = UUID.randomUUID().toString(); + + @BeforeClass + public static void init() throws Exception { + daprRun = startDaprApp(SdkResiliencytIT.class.getSimpleName(), 5000); + // HTTP client is deprecated, so SDK resiliency is for gRPC client only. + daprRun.switchToGRPC(); + daprClient = new DaprClientBuilder().build(); + + toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); + toxiProxyRun.start(); + toxiProxyRun.use(); + daprToxiClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT)) + .build(); + daprResilientClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES)) + .build(); + daprRetriesOnceClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1)) + .build(); + + assertTrue(daprClient instanceof DaprClientGrpc); + assertTrue(daprToxiClient instanceof DaprClientGrpc); + assertTrue(daprResilientClient instanceof DaprClientGrpc); + assertTrue(daprRetriesOnceClient instanceof DaprClientGrpc); + } + + @AfterClass + public static void tearDown() throws Exception { + if (daprClient != null) { + daprClient.close(); + } + if (daprToxiClient != null) { + daprToxiClient.close(); + } + if (daprResilientClient != null) { + daprResilientClient.close(); + } + if (daprRetriesOnceClient != null) { + daprRetriesOnceClient.close(); + } + if (toxiProxyRun != null) { + toxiProxyRun.stop(); + } + } + + @Test + public void retryAndTimeout() { + AtomicInteger toxiClientErrorCount = new AtomicInteger(); + AtomicInteger retryOneClientErrorCount = new AtomicInteger(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + String key = randomStateKeyPrefix + "_" + i; + String value = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8)); + try { + daprToxiClient.saveState(STATE_STORE_NAME, key, value).block(); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + toxiClientErrorCount.incrementAndGet(); + } + try { + daprRetriesOnceClient.saveState(STATE_STORE_NAME, key, value).block(); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + retryOneClientErrorCount.incrementAndGet(); + } + + // We retry forever so that the call below should always work. + daprResilientClient.saveState(STATE_STORE_NAME, key, value).block(); + // Makes sure the value was actually saved. + String savedValue = daprClient.getState(STATE_STORE_NAME, key, String.class).block().getValue(); + assertEquals(value, savedValue); + } + + // This assertion makes sure that toxicity is on + assertTrue(toxiClientErrorCount.get() > 0); + assertTrue(retryOneClientErrorCount.get() > 0); + // A client without retries should have more errors than a client with one retry. + assertTrue(toxiClientErrorCount.get() > retryOneClientErrorCount.get()); + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 202a6c67e5..189dc9d64f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -13,6 +13,7 @@ package io.dapr.client; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; @@ -23,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.net.URI; /** @@ -59,6 +59,11 @@ public class DaprClientBuilder { */ private DaprObjectSerializer stateSerializer; + /** + * Resiliency configuration for DaprClient. + */ + private ResiliencyOptions resiliencyOptions; + /** * Creates a constructor for DaprClient. * @@ -109,6 +114,17 @@ public DaprClientBuilder withStateSerializer(DaprObjectSerializer stateSerialize return this; } + /** + * Sets the resiliency options for DaprClient. + * + * @param options Serializer for objects to be persisted. + * @return This instance. + */ + public DaprClientBuilder withResiliencyOptions(ResiliencyOptions options) { + this.resiliencyOptions = options; + return this; + } + /** * Build an instance of the Client based on the provided setup. * @@ -166,7 +182,12 @@ private DaprClient buildDaprClientGrpc() { final ManagedChannel channel = buildGrpcManagedChanel(); final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); - return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer); + return new DaprClientGrpc( + channelFacade, + asyncStub, + this.objectSerializer, + this.stateSerializer, + resiliencyOptions); } private ManagedChannel buildGrpcManagedChanel() { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 49cf4aca9e..9e0d9e5751 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -44,9 +44,12 @@ import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.client.domain.UnsubscribeConfigurationRequest; import io.dapr.client.domain.UnsubscribeConfigurationResponse; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; +import io.dapr.internal.resiliency.RetryPolicy; +import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.DefaultContentTypeConverter; @@ -92,18 +95,28 @@ public class DaprClientGrpc extends AbstractDaprClient { */ private final GrpcChannelFacade channel; + /** + * The timeout policy. + */ + private final TimeoutPolicy timeoutPolicy; + + /** + * The retry policy. + */ + private final RetryPolicy retryPolicy; + /** * The async gRPC stub. */ - private DaprGrpc.DaprStub asyncStub; + private final DaprGrpc.DaprStub asyncStub; /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * - * @param channel Facade for the managed GRPC channel - * @param asyncStub async gRPC stub - * @param objectSerializer Serializer for transient request/response objects. - * @param stateSerializer Serializer for state objects. + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. * @see DaprClientBuilder */ DaprClientGrpc( @@ -111,9 +124,32 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprGrpc.DaprStub asyncStub, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { + this(channel, asyncStub, objectSerializer, stateSerializer, null); + } + + /** + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param resiliencyOptions Client-level override for resiliency options. + * @see DaprClientBuilder + */ + DaprClientGrpc( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + ResiliencyOptions resiliencyOptions) { super(objectSerializer, stateSerializer); this.channel = channel; this.asyncStub = intercept(asyncStub); + this.timeoutPolicy = new TimeoutPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + this.retryPolicy = new RetryPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -994,14 +1030,14 @@ private ConfigurationItem buildConfigurationItem( * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { + private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, - CallOptions callOptions, + CallOptions options, Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { @@ -1009,7 +1045,6 @@ public void start(final Listener responseListener, final Metadata metadat if (daprApiToken != null) { metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); } - super.start(responseListener, metadata); } }; @@ -1030,11 +1065,13 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu } private Mono createMono(Consumer> consumer) { - return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private Flux createFlux(Consumer> consumer) { - return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private StreamObserver createStreamObserver(MonoSink sink) { diff --git a/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java b/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java new file mode 100644 index 0000000000..5c60ceccc0 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.resiliency; + +import java.time.Duration; + +/** + * Resiliency policy for SDK communication to Dapr API. + */ +public final class ResiliencyOptions { + + private Duration timeout; + + private Integer maxRetries; + + public Duration getTimeout() { + return timeout; + } + + public ResiliencyOptions setTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public Integer getMaxRetries() { + return maxRetries; + } + + public ResiliencyOptions setMaxRetries(Integer maxRetries) { + this.maxRetries = maxRetries; + return this; + } +} diff --git a/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java b/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java new file mode 100644 index 0000000000..88ed12c9e7 --- /dev/null +++ b/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.config; + +import java.time.Duration; + +/** + * Integer configuration property. + */ +public class MillisecondsDurationProperty extends Property { + + /** + * {@inheritDoc} + */ + MillisecondsDurationProperty(String name, String envName, Duration defaultValue) { + super(name, envName, defaultValue); + } + + /** + * {@inheritDoc} + */ + @Override + protected Duration parse(String value) { + long longValue = Long.parseLong(value); + return Duration.ofMillis(longValue); + } + +} diff --git a/sdk/src/main/java/io/dapr/config/Properties.java b/sdk/src/main/java/io/dapr/config/Properties.java index 08bd911d6b..f4c4ba6033 100644 --- a/sdk/src/main/java/io/dapr/config/Properties.java +++ b/sdk/src/main/java/io/dapr/config/Properties.java @@ -17,6 +17,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; /** * Global properties for Dapr's SDK, using Supplier so they are dynamically resolved. @@ -38,6 +39,16 @@ public class Properties { */ private static final Integer DEFAULT_GRPC_PORT = 50001; + /** + * Dapr's default max retries. + */ + private static final Integer DEFAULT_API_MAX_RETRIES = 0; + + /** + * Dapr's default timeout in seconds. + */ + private static final Duration DEFAULT_API_TIMEOUT = Duration.ofMillis(0L); + /** * Dapr's default use of gRPC or HTTP. */ @@ -115,6 +126,22 @@ public class Properties { "DAPR_HTTP_ENDPOINT", null); + /** + * Maximum number of retries for retriable exceptions. + */ + public static final Property MAX_RETRIES = new IntegerProperty( + "dapr.api.maxRetries", + "DAPR_API_MAX_RETRIES", + DEFAULT_API_MAX_RETRIES); + + /** + * Timeout for API calls. + */ + public static final Property TIMEOUT = new MillisecondsDurationProperty( + "dapr.api.timeoutMilliseconds", + "DAPR_API_TIMEOUT_MILLISECONDS", + DEFAULT_API_TIMEOUT); + /** * Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car. * @deprecated This attribute will be deleted at SDK version 1.10. diff --git a/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java b/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java new file mode 100644 index 0000000000..bf0bfdb245 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java @@ -0,0 +1,127 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.resiliency; + +import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; + +/** + * Retry policy for SDK communication to Dapr API. + */ +public final class RetryPolicy { + + private static final int MIN_BACKOFF_MILLIS = 500; + + private static final int MAX_BACKOFF_SECONDS = 5; + + private final Retry retrySpec; + + public RetryPolicy() { + this(null); + } + + public RetryPolicy(Integer maxRetries) { + this.retrySpec = buildRetrySpec(maxRetries != null ? maxRetries : Properties.MAX_RETRIES.get()); + } + + /** + * Applies the retry policy to an expected Mono action. + * @param response Response + * @param Type expected for the action's response + * @return action with retry + */ + public Mono apply(Mono response) { + if (this.retrySpec == null) { + return response; + } + + return response.retryWhen(retrySpec) + .onErrorMap(throwable -> findDaprException(throwable)); + } + + /** + * Applies the retry policy to an expected Flux action. + * @param response Response + * @param Type expected for the action's response + * @return action with retry + */ + public Flux apply(Flux response) { + if (this.retrySpec == null) { + return response; + } + + return response.retryWhen(retrySpec) + .onErrorMap(throwable -> findDaprException(throwable)); + } + + private static Retry buildRetrySpec(int maxRetries) { + if (maxRetries == 0) { + return null; + } + + if (maxRetries < 0) { + return Retry.indefinitely() + .filter(throwable -> isRetryableGrpcError(throwable)); + } + + return Retry.backoff(maxRetries, Duration.ofMillis(MIN_BACKOFF_MILLIS)) + .maxBackoff(Duration.ofSeconds(MAX_BACKOFF_SECONDS)) + .filter(throwable -> isRetryableGrpcError(throwable)); + } + + private static boolean isRetryableGrpcError(Throwable throwable) { + Status grpcStatus = findGrpcStatusCode(throwable); + if (grpcStatus == null) { + return false; + } + + switch (grpcStatus.getCode()) { + case DEADLINE_EXCEEDED: + case UNAVAILABLE: + return true; + default: + return false; + } + } + + private static Status findGrpcStatusCode(Throwable throwable) { + while (throwable != null) { + if (throwable instanceof StatusRuntimeException) { + return ((StatusRuntimeException) throwable).getStatus(); + } + + throwable = throwable.getCause(); + } + return null; + } + + private static Throwable findDaprException(Throwable throwable) { + Throwable original = throwable; + while (throwable != null) { + if (throwable instanceof DaprException) { + return throwable; + } + + throwable = throwable.getCause(); + } + return original; + } +} diff --git a/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java b/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java new file mode 100644 index 0000000000..38329d67c1 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.resiliency; + +import io.dapr.config.Properties; +import io.grpc.CallOptions; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Timeout policy for SDK communication to Dapr API. + */ +public final class TimeoutPolicy { + + private final Duration timeout; + + /** + * Instantiates a new timeout policy with override value. + * @param timeout Override timeout value. + */ + public TimeoutPolicy(Duration timeout) { + this.timeout = timeout != null ? timeout : Properties.TIMEOUT.get(); + } + + /** + * Instantiates a new timeout policy with default value. + */ + public TimeoutPolicy() { + this(null); + } + + /** + * Applies the timeout policy to a gRPC call options. + * @param options Call options + * @return Call options with retry policy applied + */ + public CallOptions apply(CallOptions options) { + if (this.timeout.isZero() || this.timeout.isNegative()) { + return options; + } + + return options.withDeadlineAfter(this.timeout.toMillis(), TimeUnit.MILLISECONDS); + } +} diff --git a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java index 840c67ceff..a13babd6bd 100644 --- a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java +++ b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java @@ -22,6 +22,8 @@ */ public final class NetworkUtils { + private static final long RETRY_WAIT_MILLISECONDS = 1000; + private NetworkUtils() { } @@ -34,7 +36,7 @@ private NetworkUtils() { */ public static void waitForSocket(String host, int port, int timeoutInMilliseconds) throws InterruptedException { long started = System.currentTimeMillis(); - Retry.callWithRetry(() -> { + callWithRetry(() -> { try { try (Socket socket = new Socket()) { // timeout cannot be negative. @@ -47,4 +49,31 @@ public static void waitForSocket(String host, int port, int timeoutInMillisecond } }, timeoutInMilliseconds); } + + private static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException { + long started = System.currentTimeMillis(); + while (true) { + Throwable exception; + try { + function.run(); + return; + } catch (Exception e) { + exception = e; + } catch (AssertionError e) { + exception = e; + } + + long elapsed = System.currentTimeMillis() - started; + if (elapsed >= retryTimeoutMilliseconds) { + if (exception instanceof RuntimeException) { + throw (RuntimeException)exception; + } + + throw new RuntimeException(exception); + } + + long remaining = retryTimeoutMilliseconds - elapsed; + Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS)); + } + } } diff --git a/sdk/src/main/java/io/dapr/utils/Retry.java b/sdk/src/main/java/io/dapr/utils/Retry.java deleted file mode 100644 index 0d181ff033..0000000000 --- a/sdk/src/main/java/io/dapr/utils/Retry.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.utils; - -class Retry { - - private static final long RETRY_WAIT_MILLISECONDS = 1000; - - private Retry() { - } - - static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException { - long started = System.currentTimeMillis(); - while (true) { - Throwable exception; - try { - function.run(); - return; - } catch (Exception e) { - exception = e; - } catch (AssertionError e) { - exception = e; - } - - long elapsed = System.currentTimeMillis() - started; - if (elapsed >= retryTimeoutMilliseconds) { - if (exception instanceof RuntimeException) { - throw (RuntimeException)exception; - } - - throw new RuntimeException(exception); - } - - long remaining = retryTimeoutMilliseconds - elapsed; - Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS)); - } - } -} diff --git a/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java b/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java new file mode 100644 index 0000000000..d495832c21 --- /dev/null +++ b/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.resiliency; + +import io.dapr.internal.resiliency.RetryPolicy; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.junit.Test; +import reactor.core.Exceptions; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class RetryPolicyTest { + + private static final String SUCCESS_MESSAGE = "It worked!"; + + private static final RuntimeException RETRYABLE_EXCEPTION = + new StatusRuntimeException(Status.DEADLINE_EXCEEDED); + + @Test + public void zeroRetriesThenError() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(0); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION); + + try { + policy.apply(action).block(); + fail("Exception expected"); + } catch (Exception e) { + assertSame(RETRYABLE_EXCEPTION, e); + } + assertEquals(1, callCounter.get()); + } + + @Test + public void zeroRetriesThenSuccess() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(0); + Mono action = createActionErrorAndReturn(callCounter, 0, RETRYABLE_EXCEPTION); + + String response = policy.apply(action).block(); + assertEquals(SUCCESS_MESSAGE, response); + assertEquals(1, callCounter.get()); + } + + @Test + public void twoRetriesThenSuccess() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, 2, RETRYABLE_EXCEPTION); + + String response = policy.apply(action).block(); + assertEquals(SUCCESS_MESSAGE, response); + assertEquals(3, callCounter.get()); + } + + @Test + public void threeRetriesThenError() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION); + + try { + policy.apply(action).block(); + fail("Exception expected"); + } catch (Exception e) { + assertTrue(Exceptions.isRetryExhausted(e)); + } + assertEquals(4, callCounter.get()); + } + + @Test + public void notRetryableException() { + AtomicInteger callCounter = new AtomicInteger(); + RuntimeException exception = new ArithmeticException(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, exception); + + assertThrows(ArithmeticException.class, () -> { + policy.apply(action).block(); + }); + assertEquals(1, callCounter.get()); + } + + + + private static Mono createActionErrorAndReturn( + AtomicInteger callCounter, + int firstErrors, + RuntimeException error) { + return Mono.fromCallable(() -> { + if (callCounter.incrementAndGet() <= firstErrors) { + throw error; + } + + return SUCCESS_MESSAGE; + }); + } +}