Skip to content

Commit

Permalink
Implement retry and timeout policy for gRPC client.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed Jul 29, 2023
1 parent 9dc842f commit 73ad0a0
Show file tree
Hide file tree
Showing 18 changed files with 819 additions and 90 deletions.
30 changes: 23 additions & 7 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
Expand Down Expand Up @@ -46,26 +47,38 @@ public class ActorClient implements AutoCloseable {
* Instantiates a new channel for Dapr sidecar communication.
*/
public ActorClient() {
this(Properties.API_PROTOCOL.get());
this(null);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(ResiliencyOptions resiliencyOptions) {
this(Properties.API_PROTOCOL.get(), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol) {
this(apiProtocol, buildManagedChannel(apiProtocol));
private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) {
this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param grpcManagedChannel gRPC channel.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) {
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel, ResiliencyOptions resiliencyOptions) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel);
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions);
}

/**
Expand Down Expand Up @@ -119,9 +132,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
private static DaprClient buildDaprClient(
DaprApiProtocol apiProtocol,
Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
case HTTP: {
LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10.");
return new DaprHttpClient(new DaprHttpBuilder().build());
Expand Down
32 changes: 26 additions & 6 deletions sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
Expand All @@ -31,6 +34,7 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.sql.Time;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand All @@ -39,18 +43,33 @@
*/
class DaprGrpcClient implements DaprClient {

/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional)
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}

/**
Expand Down Expand Up @@ -78,14 +97,14 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Expand Down Expand Up @@ -114,7 +133,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprGrpcClient(DaprGrpc.newStub(channel), null);
}

@Test
Expand Down
5 changes: 5 additions & 0 deletions sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@
<version>1.3.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>

<build>
Expand Down
16 changes: 16 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/DaprPorts.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.it;

import io.dapr.config.Properties;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
Expand Down Expand Up @@ -46,6 +48,20 @@ public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPor
}
}

public void use() {
if (this.httpPort != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.httpPort));
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.httpPort);
}

if (this.grpcPort != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort));
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort);
}
}

public Integer getGrpcPort() {
return grpcPort;
}
Expand Down
17 changes: 4 additions & 13 deletions sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,11 @@ public void stop() throws InterruptedException, IOException {
}

public void use() {
if (this.ports.getHttpPort() != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort()));
}
if (this.ports.getGrpcPort() != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort()));
}
this.ports.use();
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort());
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort());
}

public void switchToGRPC() {
Expand All @@ -165,15 +156,15 @@ public void switchToProtocol(DaprApiProtocol protocol) {
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
}

public int getGrpcPort() {
public Integer getGrpcPort() {
return ports.getGrpcPort();
}

public int getHttpPort() {
public Integer getHttpPort() {
return ports.getHttpPort();
}

public int getAppPort() {
public Integer getAppPort() {
return ports.getAppPort();
}

Expand Down
91 changes: 91 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;

import java.io.IOException;
import java.time.Duration;


public class ToxiProxyRun implements Stoppable {

private final DaprRun daprRun;

private final Duration latency;

private final Duration jitter;

private Command toxiProxyServer;

private ToxiproxyClient toxiproxyClient;

private Proxy grpcProxy;

private Proxy httpProxy;

private DaprPorts toxiProxyPorts;

public ToxiProxyRun(DaprRun run, Duration latency, Duration jitter) {
this.daprRun = run;
this.latency = latency;
this.jitter = jitter;
this.toxiProxyPorts = DaprPorts.build(true, true, true);
// artursouza: we use the "appPort" for the ToxiProxy server.
// artursouza: this oneliner is much easier than using TestContainers (sorry).
this.toxiProxyServer = new Command(
"Starting HTTP server on endpoint",
"docker run --rm --net=host --entrypoint=\"/toxiproxy\" -it ghcr.io/shopify/toxiproxy --port "
+ this.toxiProxyPorts.getAppPort());
}

public void start() throws IOException, InterruptedException {
this.toxiProxyServer.run();
this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort());

if (this.daprRun.getGrpcPort() != null) {
this.grpcProxy = toxiproxyClient.createProxy(
"daprd_grpc",
"127.0.0.1:" + this.toxiProxyPorts.getGrpcPort(),
"127.0.0.1:" + this.daprRun.getGrpcPort());
this.grpcProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis())
.setJitter(this.jitter.toMillis());
}

if (this.daprRun.getHttpPort() != null) {
this.httpProxy = toxiproxyClient.createProxy(
"daprd_http",
"127.0.0.1:" + this.toxiProxyPorts.getHttpPort(),
"127.0.0.1:" + this.daprRun.getHttpPort());
this.httpProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis())
.setJitter(this.jitter.toMillis());
}
}

public void use() {
this.toxiProxyPorts.use();
}

@Override
public void stop() throws InterruptedException, IOException {
this.toxiProxyServer.stop();
this.toxiproxyClient = null;
this.grpcProxy = null;
this.httpProxy = null;
}
}
Loading

0 comments on commit 73ad0a0

Please sign in to comment.