Skip to content

Fix 787 #832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 77 additions & 77 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/DaprGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
package io.dapr.actors.runtime;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.DurationUtils;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
* A DaprClient over HTTP for Actor's runtime.
Expand All @@ -48,44 +51,42 @@ class DaprGrpcClient implements DaprClient {
/**
* The GRPC client to be used.
*
* @see io.dapr.v1.DaprGrpc.DaprFutureStub
* @see io.dapr.v1.DaprGrpc.DaprStub
*/
private DaprGrpc.DaprFutureStub client;
private DaprGrpc.DaprStub client;

/**
* Internal constructor.
*
* @param channel channel (client needs to close channel after use).
*/
DaprGrpcClient(ManagedChannel channel) {
this(DaprGrpc.newFutureStub(channel));
this(DaprGrpc.newStub(channel));
}

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param daprStubClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprFutureStub grpcClient) {
this.client = grpcClient;
DaprGrpcClient(DaprGrpc.DaprStub daprStubClient) {
this.client = daprStubClient;
}

/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
return Mono.fromCallable(() -> {
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setKey(keyName)
.build();

ListenableFuture<DaprProtos.GetActorStateResponse> futureResponse = client.getActorState(req);
return futureResponse.get();
}).map(r -> r.getData().toByteArray());
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setKey(keyName)
.build();

return Mono.<DaprProtos.GetActorStateResponse>create(it ->
client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray());
}

/**
Expand Down Expand Up @@ -132,10 +133,7 @@ public Mono<Void> saveStateTransactionally(
.addAllOperations(grpcOps)
.build();

return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureResponse = client.executeActorStateTransaction(req);
return futureResponse.get();
}).then();
return Mono.<Empty>create(it -> client.executeActorStateTransaction(req, createStreamObserver(it))).then();
}

/**
Expand All @@ -147,40 +145,31 @@ public Mono<Void> registerReminder(
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorReminderRequest req =
DaprProtos.RegisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.setData(ByteString.copyFrom(reminderParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
.build();

ListenableFuture<Empty> futureResponse = client.registerActorReminder(req);
futureResponse.get();
return null;
});
DaprProtos.RegisterActorReminderRequest req =
DaprProtos.RegisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.setData(ByteString.copyFrom(reminderParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(reminderParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(reminderParams.getPeriod()))
.build();
return Mono.<Empty>create(it -> client.registerActorReminder(req, createStreamObserver(it))).then().then();
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorReminderRequest req =
DaprProtos.UnregisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.build();

ListenableFuture<Empty> futureResponse = client.unregisterActorReminder(req);
futureResponse.get();
return null;
});
DaprProtos.UnregisterActorReminderRequest req =
DaprProtos.UnregisterActorReminderRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(reminderName)
.build();

return Mono.<Empty>create(it -> client.unregisterActorReminder(req, createStreamObserver(it))).then().then();
}

/**
Expand All @@ -192,41 +181,52 @@ public Mono<Void> registerTimer(
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> {
DaprProtos.RegisterActorTimerRequest req =
DaprProtos.RegisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.setCallback(timerParams.getCallback())
.setData(ByteString.copyFrom(timerParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod()))
.build();

ListenableFuture<Empty> futureResponse = client.registerActorTimer(req);
futureResponse.get();
return null;
});
DaprProtos.RegisterActorTimerRequest req =
DaprProtos.RegisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.setCallback(timerParams.getCallback())
.setData(ByteString.copyFrom(timerParams.getData()))
.setDueTime(DurationUtils.convertDurationToDaprFormat(timerParams.getDueTime()))
.setPeriod(DurationUtils.convertDurationToDaprFormat(timerParams.getPeriod()))
.build();

return Mono.<Empty>create(it -> client.registerActorTimer(req, createStreamObserver(it))).then().then();
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
return Mono.fromCallable(() -> {
DaprProtos.UnregisterActorTimerRequest req =
DaprProtos.UnregisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.build();

ListenableFuture<Empty> futureResponse = client.unregisterActorTimer(req);
futureResponse.get();
return null;
});
DaprProtos.UnregisterActorTimerRequest req =
DaprProtos.UnregisterActorTimerRequest.newBuilder()
.setActorType(actorType)
.setActorId(actorId)
.setName(timerName)
.build();

return Mono.<Empty>create(it -> client.unregisterActorTimer(req, createStreamObserver(it))).then().then();
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
sink.success(value);
}

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
}

@Override
public void onCompleted() {
sink.success();
}
};
}

}
Loading