Skip to content

Commit 9f190d2

Browse files
committed
Support remote endpoint.
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
1 parent d34bcb8 commit 9f190d2

File tree

10 files changed

+288
-126
lines changed

10 files changed

+288
-126
lines changed

sdk/src/main/java/io/dapr/client/DaprClientBuilder.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import java.io.Closeable;
27+
import java.net.URI;
2728

2829
/**
2930
* A builder for the DaprClient,
@@ -162,19 +163,28 @@ private DaprClient buildDaprClient(DaprApiProtocol protocol) {
162163
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
163164
*/
164165
private DaprClient buildDaprClientGrpc() {
166+
final ManagedChannel channel = buildGrpcManagedChanel();
167+
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
168+
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
169+
return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer);
170+
}
171+
172+
private ManagedChannel buildGrpcManagedChanel() {
173+
String host = Properties.SIDECAR_IP.get();
165174
int port = Properties.GRPC_PORT.get();
166-
if (port <= 0) {
167-
throw new IllegalArgumentException("Invalid port.");
175+
boolean insecure = true;
176+
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
177+
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
178+
URI uri = URI.create(grpcEndpoint);
179+
insecure = uri.getScheme().equalsIgnoreCase("http");
180+
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
168181
}
169-
ManagedChannel channel = ManagedChannelBuilder.forAddress(
170-
Properties.SIDECAR_IP.get(), port).usePlaintext().userAgent(Version.getSdkVersion()).build();
171-
Closeable closeableChannel = () -> {
172-
if (channel != null && !channel.isShutdown()) {
173-
channel.shutdown();
174-
}
175-
};
176-
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
177-
return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer);
182+
ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(host, port)
183+
.userAgent(Version.getSdkVersion());
184+
if (insecure) {
185+
builder = builder.usePlaintext();
186+
}
187+
return builder.build();
178188
}
179189

180190
/**

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

Lines changed: 55 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import io.dapr.serializer.DaprObjectSerializer;
5151
import io.dapr.serializer.DefaultObjectSerializer;
5252
import io.dapr.utils.DefaultContentTypeConverter;
53-
import io.dapr.utils.NetworkUtils;
5453
import io.dapr.utils.TypeRef;
5554
import io.dapr.v1.CommonProtos;
5655
import io.dapr.v1.DaprGrpc;
@@ -69,7 +68,6 @@
6968
import reactor.core.publisher.MonoSink;
7069
import reactor.util.context.ContextView;
7170

72-
import java.io.Closeable;
7371
import java.io.IOException;
7472
import java.util.ArrayList;
7573
import java.util.Collections;
@@ -92,7 +90,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
9290
/**
9391
* The GRPC managed channel to be used.
9492
*/
95-
private Closeable channel;
93+
private final GrpcChannelFacade channel;
9694

9795
/**
9896
* The async gRPC stub.
@@ -102,19 +100,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
102100
/**
103101
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
104102
*
105-
* @param closeableChannel A closeable for a Managed GRPC channel
103+
* @param channel Facade for the managed GRPC channel
106104
* @param asyncStub async gRPC stub
107105
* @param objectSerializer Serializer for transient request/response objects.
108106
* @param stateSerializer Serializer for state objects.
109107
* @see DaprClientBuilder
110108
*/
111109
DaprClientGrpc(
112-
Closeable closeableChannel,
110+
GrpcChannelFacade channel,
113111
DaprGrpc.DaprStub asyncStub,
114112
DaprObjectSerializer objectSerializer,
115113
DaprObjectSerializer stateSerializer) {
116114
super(objectSerializer, stateSerializer);
117-
this.channel = closeableChannel;
115+
this.channel = channel;
118116
this.asyncStub = intercept(asyncStub);
119117
}
120118

@@ -145,13 +143,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State
145143
*/
146144
@Override
147145
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
148-
return Mono.fromRunnable(() -> {
149-
try {
150-
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
151-
} catch (InterruptedException e) {
152-
throw new RuntimeException(e);
153-
}
154-
});
146+
return this.channel.waitForChannelReady(timeoutInMilliseconds);
155147
}
156148

157149
/**
@@ -193,7 +185,6 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
193185
}
194186

195187
/**
196-
*
197188
* {@inheritDoc}
198189
*/
199190
@Override
@@ -209,7 +200,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
209200
throw new IllegalArgumentException("pubsubName and topic name cannot be null or empty");
210201
}
211202

212-
for (BulkPublishEntry<?> entry: request.getEntries()) {
203+
for (BulkPublishEntry<?> entry : request.getEntries()) {
213204
Object event = entry.getEvent();
214205
byte[] data;
215206
String contentType = entry.getContentType();
@@ -251,7 +242,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
251242
}
252243

253244
Map<String, BulkPublishEntry<T>> entryMap = new HashMap<>();
254-
for (BulkPublishEntry<T> entry: request.getEntries()) {
245+
for (BulkPublishEntry<T> entry : request.getEntries()) {
255246
entryMap.put(entry.getEntryId(), entry);
256247
}
257248
return Mono.deferContextual(
@@ -299,17 +290,17 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
299290
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342
300291

301292
return Mono.deferContextual(
302-
context -> this.<CommonProtos.InvokeResponse>createMono(
303-
it -> intercept(context, asyncStub).invokeService(envelope, it)
304-
)
305-
).flatMap(
306-
it -> {
307-
try {
308-
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
309-
} catch (IOException e) {
310-
throw DaprException.propagate(e);
311-
}
312-
}
293+
context -> this.<CommonProtos.InvokeResponse>createMono(
294+
it -> intercept(context, asyncStub).invokeService(envelope, it)
295+
)
296+
).flatMap(
297+
it -> {
298+
try {
299+
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
300+
} catch (IOException e) {
301+
throw DaprException.propagate(e);
302+
}
303+
}
313304
);
314305
} catch (Exception ex) {
315306
return DaprException.wrapMono(ex);
@@ -346,17 +337,17 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
346337
DaprProtos.InvokeBindingRequest envelope = builder.build();
347338

348339
return Mono.deferContextual(
349-
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
350-
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
351-
)
352-
).flatMap(
353-
it -> {
354-
try {
355-
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
356-
} catch (IOException e) {
357-
throw DaprException.propagate(e);
358-
}
359-
}
340+
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
341+
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
342+
)
343+
).flatMap(
344+
it -> {
345+
try {
346+
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
347+
} catch (IOException e) {
348+
throw DaprException.propagate(e);
349+
}
350+
}
360351
);
361352
} catch (Exception ex) {
362353
return DaprException.wrapMono(ex);
@@ -442,12 +433,12 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
442433
DaprProtos.GetBulkStateRequest envelope = builder.build();
443434

444435
return Mono.deferContextual(
445-
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
446-
.getBulkState(envelope, it)
447-
)
448-
).map(
449-
it ->
450-
it
436+
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
437+
.getBulkState(envelope, it)
438+
)
439+
).map(
440+
it ->
441+
it
451442
.getItemsList()
452443
.stream()
453444
.map(b -> {
@@ -705,8 +696,8 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
705696
}
706697

707698
DaprProtos.GetSecretRequest.Builder requestBuilder = DaprProtos.GetSecretRequest.newBuilder()
708-
.setStoreName(secretStoreName)
709-
.setKey(key);
699+
.setStoreName(secretStoreName)
700+
.setKey(key);
710701

711702
if (metadata != null) {
712703
requestBuilder.putAllMetadata(metadata);
@@ -740,18 +731,18 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest
740731

741732
return Mono.deferContextual(
742733
context ->
743-
this.<DaprProtos.GetBulkSecretResponse>createMono(
744-
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
745-
)
734+
this.<DaprProtos.GetBulkSecretResponse>createMono(
735+
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
736+
)
746737
).map(it -> {
747738
Map<String, DaprProtos.SecretResponse> secretsMap = it.getDataMap();
748739
if (secretsMap == null) {
749740
return Collections.emptyMap();
750741
}
751742
return secretsMap
752-
.entrySet()
753-
.stream()
754-
.collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap()));
743+
.entrySet()
744+
.stream()
745+
.collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap()));
755746
});
756747
} catch (Exception ex) {
757748
return DaprException.wrapMono(ex);
@@ -805,7 +796,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
805796
try {
806797
return buildQueryStateKeyValue(v, type);
807798
} catch (Exception e) {
808-
throw DaprException.propagate(e);
799+
throw DaprException.propagate(e);
809800
}
810801
})
811802
.collect(Collectors.toList());
@@ -900,7 +891,7 @@ private Mono<Map<String, ConfigurationItem>> getConfiguration(DaprProtos.GetConf
900891
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItems().entrySet().iterator();
901892
while (itr.hasNext()) {
902893
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
903-
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
894+
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
904895
}
905896
return Collections.unmodifiableMap(configMap);
906897
}
@@ -934,15 +925,15 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf
934925
return this.<DaprProtos.SubscribeConfigurationResponse>createFlux(
935926
it -> intercept(asyncStub).subscribeConfiguration(envelope, it)
936927
).map(
937-
it -> {
938-
Map<String, ConfigurationItem> configMap = new HashMap<>();
939-
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
940-
while (itr.hasNext()) {
941-
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
942-
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
928+
it -> {
929+
Map<String, ConfigurationItem> configMap = new HashMap<>();
930+
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
931+
while (itr.hasNext()) {
932+
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
933+
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
934+
}
935+
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
943936
}
944-
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
945-
}
946937
);
947938
} catch (Exception ex) {
948939
return DaprException.wrapFlux(ex);
@@ -990,8 +981,8 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
990981
private ConfigurationItem buildConfigurationItem(
991982
CommonProtos.ConfigurationItem configurationItem, String key) {
992983
return new ConfigurationItem(
993-
key,
994-
configurationItem.getValue(),
984+
key,
985+
configurationItem.getValue(),
995986
configurationItem.getVersion(),
996987
configurationItem.getMetadataMap()
997988
);

sdk/src/main/java/io/dapr/client/DaprHttp.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import reactor.util.context.ContextView;
3333

3434
import java.io.IOException;
35+
import java.net.URI;
3536
import java.nio.charset.StandardCharsets;
3637
import java.util.Arrays;
3738
import java.util.Collections;
@@ -141,14 +142,9 @@ public int getStatusCode() {
141142
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
142143

143144
/**
144-
* Hostname used to communicate to Dapr's HTTP endpoint.
145+
* Endpoint used to communicate to Dapr's HTTP endpoint.
145146
*/
146-
private final String hostname;
147-
148-
/**
149-
* Port used to communicate to Dapr's HTTP endpoint.
150-
*/
151-
private final int port;
147+
private final URI uri;
152148

153149
/**
154150
* Http client used for all API calls.
@@ -163,8 +159,18 @@ public int getStatusCode() {
163159
* @param httpClient RestClient used for all API calls in this new instance.
164160
*/
165161
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
166-
this.hostname = hostname;
167-
this.port = port;
162+
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
163+
this.httpClient = httpClient;
164+
}
165+
166+
/**
167+
* Creates a new instance of {@link DaprHttp}.
168+
*
169+
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
170+
* @param httpClient RestClient used for all API calls in this new instance.
171+
*/
172+
DaprHttp(String uri, OkHttpClient httpClient) {
173+
this.uri = URI.create(uri);
168174
this.httpClient = httpClient;
169175
}
170176

@@ -273,9 +279,14 @@ private CompletableFuture<Response> doInvokeApi(String method,
273279
body = RequestBody.Companion.create(content, mediaType);
274280
}
275281
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
276-
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
277-
.host(this.hostname)
278-
.port(this.port);
282+
urlBuilder.scheme(uri.getScheme())
283+
.host(uri.getHost());
284+
if (uri.getPort() > 0) {
285+
urlBuilder.port(uri.getPort());
286+
}
287+
if (uri.getPath() != null) {
288+
urlBuilder.addPathSegments(uri.getPath());
289+
}
279290
for (String pathSegment : pathSegments) {
280291
urlBuilder.addPathSegment(pathSegment);
281292
}

sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ private DaprHttp buildDaprHttp() {
8585
}
8686
}
8787

88+
String endpoint = Properties.HTTP_ENDPOINT.get();
89+
if ((endpoint != null) && !endpoint.isEmpty()) {
90+
return new DaprHttp(endpoint, OK_HTTP_CLIENT);
91+
}
92+
8893
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
8994
}
9095
}

0 commit comments

Comments
 (0)