Skip to content

Commit 5958266

Browse files
committed
Fix bi-di subscription to support dapr-api-token
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
1 parent cb552ba commit 5958266

File tree

9 files changed

+159
-153
lines changed

9 files changed

+159
-153
lines changed

sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
5959
* @param overrideProperties Override properties.
6060
*/
6161
public ActorClient(Properties overrideProperties) {
62-
this(buildManagedChannel(overrideProperties), null);
62+
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
6363
}
6464

6565
/**
@@ -69,7 +69,7 @@ public ActorClient(Properties overrideProperties) {
6969
* @param resiliencyOptions Client resiliency options.
7070
*/
7171
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
72-
this(buildManagedChannel(overrideProperties), resiliencyOptions);
72+
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
7373
}
7474

7575
/**
@@ -80,9 +80,10 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
8080
*/
8181
private ActorClient(
8282
ManagedChannel grpcManagedChannel,
83-
ResiliencyOptions resiliencyOptions) {
83+
ResiliencyOptions resiliencyOptions,
84+
String daprApiToken) {
8485
this.grpcManagedChannel = grpcManagedChannel;
85-
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions);
86+
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
8687
}
8788

8889
/**
@@ -136,7 +137,11 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
136137
*/
137138
private static DaprClient buildDaprClient(
138139
Channel grpcManagedChannel,
139-
ResiliencyOptions resiliencyOptions) {
140-
return new DaprClientImpl(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
140+
ResiliencyOptions resiliencyOptions,
141+
String daprApiToken) {
142+
return new DaprClientImpl(
143+
DaprGrpc.newStub(grpcManagedChannel),
144+
resiliencyOptions,
145+
daprApiToken);
141146
}
142147
}

sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@
4242
*/
4343
class DaprClientImpl implements DaprClient {
4444

45-
/**
46-
* Timeout policy for SDK calls to Dapr API.
47-
*/
48-
private final TimeoutPolicy timeoutPolicy;
49-
5045
/**
5146
* Retry policy for SDK calls to Dapr API.
5247
*/
@@ -57,16 +52,22 @@ class DaprClientImpl implements DaprClient {
5752
*/
5853
private final DaprGrpc.DaprStub client;
5954

55+
/**
56+
* gRPC client interceptors.
57+
*/
58+
private final DaprClientGrpcInterceptors grpcInterceptors;
59+
6060
/**
6161
* Internal constructor.
6262
*
6363
* @param grpcClient Dapr's GRPC client.
64-
* @param resiliencyOptions Client resiliency options (optional)
64+
* @param resiliencyOptions Client resiliency options (optional).
65+
* @param daprApiToken Dapr API token (optional).
6566
*/
66-
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
67-
this.client = intercept(grpcClient);
68-
this.timeoutPolicy = new TimeoutPolicy(
69-
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
67+
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
68+
this.client = grpcClient;
69+
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
70+
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
7071
this.retryPolicy = new RetryPolicy(
7172
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
7273
}
@@ -85,54 +86,11 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
8586
.build();
8687
return Mono.deferContextual(
8788
context -> this.<DaprProtos.InvokeActorResponse>createMono(
88-
it -> intercept(context, this.timeoutPolicy, client).invokeActor(req, it)
89+
it -> this.grpcInterceptors.intercept(client, context).invokeActor(req, it)
8990
)
9091
).map(r -> r.getData().toByteArray());
9192
}
9293

93-
/**
94-
* Populates GRPC client with interceptors.
95-
*
96-
* @param client GRPC client for Dapr.
97-
* @return Client after adding interceptors.
98-
*/
99-
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
100-
ClientInterceptor interceptor = new ClientInterceptor() {
101-
@Override
102-
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
103-
MethodDescriptor<ReqT, RespT> methodDescriptor,
104-
CallOptions options,
105-
Channel channel) {
106-
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
107-
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
108-
@Override
109-
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
110-
String daprApiToken = Properties.API_TOKEN.get();
111-
if (daprApiToken != null) {
112-
metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
113-
}
114-
115-
super.start(responseListener, metadata);
116-
}
117-
};
118-
}
119-
};
120-
return client.withInterceptors(interceptor);
121-
}
122-
123-
/**
124-
* Populates GRPC client with interceptors for telemetry.
125-
*
126-
* @param context Reactor's context.
127-
* @param timeoutPolicy Timeout policy for gRPC call.
128-
* @param client GRPC client for Dapr.
129-
* @return Client after adding interceptors.
130-
*/
131-
private static DaprGrpc.DaprStub intercept(
132-
ContextView context, TimeoutPolicy timeoutPolicy, DaprGrpc.DaprStub client) {
133-
return DaprClientGrpcInterceptors.intercept(client, timeoutPolicy, context);
134-
}
135-
13694
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
13795
return retryPolicy.apply(
13896
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));

sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void setup() throws IOException {
106106
InProcessChannelBuilder.forName(serverName).directExecutor().build());
107107

108108
// Create a HelloWorldClient using the in-process channel;
109-
client = new DaprClientImpl(DaprGrpc.newStub(channel), null);
109+
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
110110
}
111111

112112
@Test

sdk-tests/src/test/java/io/dapr/it/DaprRun.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
import org.apache.commons.lang3.tuple.ImmutablePair;
3131

3232
import java.io.IOException;
33+
import java.util.Collections;
34+
import java.util.HashMap;
3335
import java.util.Map;
36+
import java.util.UUID;
3437
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import java.util.function.Supplier;
@@ -68,19 +71,34 @@ public class DaprRun implements Stoppable {
6871

6972
private final boolean hasAppHealthCheck;
7073

74+
private final Map<Property<?>, String> propertyOverrides;
75+
7176
private DaprRun(String testName,
7277
DaprPorts ports,
7378
String successMessage,
7479
Class serviceClass,
7580
int maxWaitMilliseconds,
7681
AppRun.AppProtocol appProtocol) {
82+
this(testName, ports, successMessage, serviceClass, maxWaitMilliseconds, appProtocol, UUID.randomUUID().toString());
83+
}
84+
85+
private DaprRun(String testName,
86+
DaprPorts ports,
87+
String successMessage,
88+
Class serviceClass,
89+
int maxWaitMilliseconds,
90+
AppRun.AppProtocol appProtocol,
91+
String daprApiToken) {
7792
// The app name needs to be deterministic since we depend on it to kill previous runs.
7893
this.appName = serviceClass == null ?
7994
testName.toLowerCase() :
8095
String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase();
8196
this.appProtocol = appProtocol;
8297
this.startCommand =
83-
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol));
98+
new Command(
99+
successMessage,
100+
buildDaprCommand(this.appName, serviceClass, ports, appProtocol),
101+
Map.of("DAPR_API_TOKEN", daprApiToken));
84102
this.listCommand = new Command(
85103
this.appName,
86104
"dapr list");
@@ -91,6 +109,9 @@ private DaprRun(String testName,
91109
this.maxWaitMilliseconds = maxWaitMilliseconds;
92110
this.started = new AtomicBoolean(false);
93111
this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass);
112+
this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{
113+
put(Properties.API_TOKEN, daprApiToken);
114+
}});
94115
}
95116

96117
public void start() throws InterruptedException, IOException {
@@ -149,7 +170,7 @@ public void stop() throws InterruptedException, IOException {
149170
}
150171

151172
public Map<Property<?>, String> getPropertyOverrides() {
152-
return this.ports.getPropertyOverrides();
173+
return this.propertyOverrides;
153174
}
154175

155176
public DaprClientBuilder newDaprClientBuilder() {
@@ -239,17 +260,13 @@ public String getAppName() {
239260

240261
public DaprClient newDaprClient() {
241262
return new DaprClientBuilder()
242-
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
243-
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
244-
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
263+
.withPropertyOverrides(this.getPropertyOverrides())
245264
.build();
246265
}
247266

248267
public DaprPreviewClient newDaprPreviewClient() {
249268
return new DaprClientBuilder()
250-
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
251-
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
252-
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
269+
.withPropertyOverrides(this.getPropertyOverrides())
253270
.buildPreviewClient();
254271
}
255272

sdk/src/main/java/io/dapr/client/DaprClientBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ private DaprClientImpl buildDaprClient() {
173173
daprHttp,
174174
this.objectSerializer,
175175
this.stateSerializer,
176-
this.resiliencyOptions);
176+
this.resiliencyOptions,
177+
properties.getValue(Properties.API_TOKEN));
177178
}
178179
}

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,6 @@ public class DaprClientImpl extends AbstractDaprClient {
119119
*/
120120
private final GrpcChannelFacade channel;
121121

122-
/**
123-
* The timeout policy.
124-
*/
125-
private final TimeoutPolicy timeoutPolicy;
126-
127122
/**
128123
* The retry policy.
129124
*/
@@ -141,9 +136,10 @@ public class DaprClientImpl extends AbstractDaprClient {
141136
*/
142137
private final DaprHttp httpClient;
143138

139+
private final DaprClientGrpcInterceptors grpcInterceptors;
140+
144141
/**
145-
* Default access level constructor, in order to create an instance of this
146-
* class use io.dapr.client.DaprClientBuilder
142+
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
147143
*
148144
* @param channel Facade for the managed GRPC channel
149145
* @param asyncStub async gRPC stub
@@ -157,7 +153,27 @@ public class DaprClientImpl extends AbstractDaprClient {
157153
DaprHttp httpClient,
158154
DaprObjectSerializer objectSerializer,
159155
DaprObjectSerializer stateSerializer) {
160-
this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null);
156+
this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, null);
157+
}
158+
159+
/**
160+
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
161+
*
162+
* @param channel Facade for the managed GRPC channel
163+
* @param asyncStub async gRPC stub
164+
* @param objectSerializer Serializer for transient request/response objects.
165+
* @param stateSerializer Serializer for state objects.
166+
* @param daprApiToken Dapr API Token.
167+
* @see DaprClientBuilder
168+
*/
169+
DaprClientImpl(
170+
GrpcChannelFacade channel,
171+
DaprGrpc.DaprStub asyncStub,
172+
DaprHttp httpClient,
173+
DaprObjectSerializer objectSerializer,
174+
DaprObjectSerializer stateSerializer,
175+
String daprApiToken) {
176+
this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, daprApiToken);
161177
}
162178

163179
/**
@@ -169,6 +185,7 @@ public class DaprClientImpl extends AbstractDaprClient {
169185
* @param objectSerializer Serializer for transient request/response objects.
170186
* @param stateSerializer Serializer for state objects.
171187
* @param resiliencyOptions Client-level override for resiliency options.
188+
* @param daprApiToken Dapr API Token.
172189
* @see DaprClientBuilder
173190
*/
174191
DaprClientImpl(
@@ -177,15 +194,47 @@ public class DaprClientImpl extends AbstractDaprClient {
177194
DaprHttp httpClient,
178195
DaprObjectSerializer objectSerializer,
179196
DaprObjectSerializer stateSerializer,
180-
ResiliencyOptions resiliencyOptions) {
197+
ResiliencyOptions resiliencyOptions,
198+
String daprApiToken) {
199+
this(
200+
channel,
201+
asyncStub,
202+
httpClient,
203+
objectSerializer,
204+
stateSerializer,
205+
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()),
206+
new RetryPolicy(resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()),
207+
daprApiToken);
208+
}
209+
210+
/**
211+
* Instantiates a new DaprClient.
212+
*
213+
* @param channel Facade for the managed GRPC channel
214+
* @param asyncStub async gRPC stub
215+
* @param httpClient client for http service invocation
216+
* @param objectSerializer Serializer for transient request/response objects.
217+
* @param stateSerializer Serializer for state objects.
218+
* @param timeoutPolicy Client-level timeout policy.
219+
* @param retryPolicy Client-level retry policy.
220+
* @param daprApiToken Dapr API Token.
221+
* @see DaprClientBuilder
222+
*/
223+
private DaprClientImpl(
224+
GrpcChannelFacade channel,
225+
DaprGrpc.DaprStub asyncStub,
226+
DaprHttp httpClient,
227+
DaprObjectSerializer objectSerializer,
228+
DaprObjectSerializer stateSerializer,
229+
TimeoutPolicy timeoutPolicy,
230+
RetryPolicy retryPolicy,
231+
String daprApiToken) {
181232
super(objectSerializer, stateSerializer);
182233
this.channel = channel;
183234
this.asyncStub = asyncStub;
184235
this.httpClient = httpClient;
185-
this.timeoutPolicy = new TimeoutPolicy(
186-
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
187-
this.retryPolicy = new RetryPolicy(
188-
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
236+
this.retryPolicy = retryPolicy;
237+
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy);
189238
}
190239

191240
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
@@ -215,7 +264,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State
215264
*/
216265
public <T extends AbstractStub<T>> T newGrpcStub(String appId, Function<Channel, T> stubBuilder) {
217266
// Adds Dapr interceptors to populate gRPC metadata automatically.
218-
return DaprClientGrpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()), timeoutPolicy);
267+
return this.grpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()));
219268
}
220269

221270
/**
@@ -425,7 +474,8 @@ private <T> Subscription<T> buildSubscription(
425474
SubscriptionListener<T> listener,
426475
TypeRef<T> type,
427476
DaprProtos.SubscribeTopicEventsRequestAlpha1 request) {
428-
Subscription<T> subscription = new Subscription<>(this.asyncStub, request, listener, response -> {
477+
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
478+
Subscription<T> subscription = new Subscription<>(interceptedStub, request, listener, response -> {
429479
if (response.getEventMessage() == null) {
430480
return null;
431481
}
@@ -1268,7 +1318,7 @@ private ConfigurationItem buildConfigurationItem(
12681318
* @return Client after adding interceptors.
12691319
*/
12701320
private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
1271-
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context);
1321+
return this.grpcInterceptors.intercept(client, context);
12721322
}
12731323

12741324
/**
@@ -1281,7 +1331,7 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien
12811331
*/
12821332
private DaprGrpc.DaprStub intercept(
12831333
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
1284-
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer);
1334+
return this.grpcInterceptors.intercept(client, context, metadataConsumer);
12851335
}
12861336

12871337
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {

0 commit comments

Comments
 (0)