Skip to content

Commit

Permalink
Implement retry and timeout policy for gRPC client.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed Jul 29, 2023
1 parent 9dc842f commit 9407693
Show file tree
Hide file tree
Showing 22 changed files with 987 additions and 94 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -101,14 +102,18 @@ jobs:
docker stop dapr_placement
cd dapr
./dist/linux_amd64/release/placement &
- name: Install Local kafka using docker-compose
- name: Install local Kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
docker ps
- name: Install Local mongo database using docker-compose
- name: Install local Mongo database using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |
echo $PATH
wget -q ${{ env.TOXIPROXY_URL }} -O $PWD/toxiproxy-server
- name: Clean up files
run: mvn clean -B
- name: Build sdk
Expand Down
33 changes: 26 additions & 7 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
Expand Down Expand Up @@ -46,26 +47,41 @@ public class ActorClient implements AutoCloseable {
* Instantiates a new channel for Dapr sidecar communication.
*/
public ActorClient() {
this(Properties.API_PROTOCOL.get());
this(null);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(ResiliencyOptions resiliencyOptions) {
this(Properties.API_PROTOCOL.get(), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol) {
this(apiProtocol, buildManagedChannel(apiProtocol));
private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) {
this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param grpcManagedChannel gRPC channel.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) {
private ActorClient(
DaprApiProtocol apiProtocol,
ManagedChannel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel);
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions);
}

/**
Expand Down Expand Up @@ -119,9 +135,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
private static DaprClient buildDaprClient(
DaprApiProtocol apiProtocol,
Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
case HTTP: {
LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10.");
return new DaprHttpClient(new DaprHttpBuilder().build());
Expand Down
32 changes: 26 additions & 6 deletions sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
Expand All @@ -31,6 +34,7 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.sql.Time;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand All @@ -39,18 +43,33 @@
*/
class DaprGrpcClient implements DaprClient {

/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional)
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}

/**
Expand Down Expand Up @@ -78,14 +97,14 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Expand Down Expand Up @@ -114,7 +133,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprGrpcClient(DaprGrpc.newStub(channel), null);
}

@Test
Expand Down
5 changes: 5 additions & 0 deletions sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@
<version>1.3.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 7 additions & 2 deletions sdk-tests/src/test/java/io/dapr/it/BaseIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.actors.client.ActorClient;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.resiliency.ResiliencyOptions;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;

Expand Down Expand Up @@ -194,8 +195,12 @@ public static void cleanUp() throws Exception {
}
}

protected ActorClient newActorClient() {
ActorClient client = new ActorClient();
protected static ActorClient newActorClient() {
return newActorClient(null);
}

protected static ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
ActorClient client = new ActorClient(resiliencyOptions);
TO_BE_CLOSED.add(client);
return client;
}
Expand Down
18 changes: 18 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,25 @@ public void run() throws InterruptedException, IOException {
}
});

final Thread stderrReader = new Thread(() -> {
try {
try (InputStream stderr = this.process.getErrorStream()) {
try (InputStreamReader isr = new InputStreamReader(stderr)) {
try (BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
System.err.println(line);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});

stdoutReader.start();
stderrReader.start();
// Waits for success to happen within 1 minute.
finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!success.get()) {
Expand Down
16 changes: 16 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/DaprPorts.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.it;

import io.dapr.config.Properties;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
Expand Down Expand Up @@ -46,6 +48,20 @@ public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPor
}
}

public void use() {
if (this.httpPort != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.httpPort));
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.httpPort);
}

if (this.grpcPort != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort));
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort);
}
}

public Integer getGrpcPort() {
return grpcPort;
}
Expand Down
17 changes: 4 additions & 13 deletions sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,11 @@ public void stop() throws InterruptedException, IOException {
}

public void use() {
if (this.ports.getHttpPort() != null) {
System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort()));
}
if (this.ports.getGrpcPort() != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort()));
}
this.ports.use();
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort());
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort());
}

public void switchToGRPC() {
Expand All @@ -165,15 +156,15 @@ public void switchToProtocol(DaprApiProtocol protocol) {
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
}

public int getGrpcPort() {
public Integer getGrpcPort() {
return ports.getGrpcPort();
}

public int getHttpPort() {
public Integer getHttpPort() {
return ports.getHttpPort();
}

public int getAppPort() {
public Integer getAppPort() {
return ports.getAppPort();
}

Expand Down
Loading

0 comments on commit 9407693

Please sign in to comment.