diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index c2d7a4444..877b27574 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -59,7 +59,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) { * @param overrideProperties Override properties. */ public ActorClient(Properties overrideProperties) { - this(buildManagedChannel(overrideProperties), null); + this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN)); } /** @@ -69,7 +69,7 @@ public ActorClient(Properties overrideProperties) { * @param resiliencyOptions Client resiliency options. */ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) { - this(buildManagedChannel(overrideProperties), resiliencyOptions); + this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN)); } /** @@ -80,9 +80,10 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp */ private ActorClient( ManagedChannel grpcManagedChannel, - ResiliencyOptions resiliencyOptions) { + ResiliencyOptions resiliencyOptions, + String daprApiToken) { this.grpcManagedChannel = grpcManagedChannel; - this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions); + this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken); } /** @@ -136,7 +137,11 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties) */ private static DaprClient buildDaprClient( Channel grpcManagedChannel, - ResiliencyOptions resiliencyOptions) { - return new DaprClientImpl(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions); + ResiliencyOptions resiliencyOptions, + String daprApiToken) { + return new DaprClientImpl( + DaprGrpc.newStub(grpcManagedChannel), + resiliencyOptions, + daprApiToken); } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java index c060e01c5..b415e812e 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java @@ -42,11 +42,6 @@ */ class DaprClientImpl implements DaprClient { - /** - * Timeout policy for SDK calls to Dapr API. - */ - private final TimeoutPolicy timeoutPolicy; - /** * Retry policy for SDK calls to Dapr API. */ @@ -57,16 +52,22 @@ class DaprClientImpl implements DaprClient { */ private final DaprGrpc.DaprStub client; + /** + * gRPC client interceptors. + */ + private final DaprClientGrpcInterceptors grpcInterceptors; + /** * Internal constructor. * * @param grpcClient Dapr's GRPC client. - * @param resiliencyOptions Client resiliency options (optional) + * @param resiliencyOptions Client resiliency options (optional). + * @param daprApiToken Dapr API token (optional). */ - DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) { - this.client = intercept(grpcClient); - this.timeoutPolicy = new TimeoutPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) { + this.client = grpcClient; + this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, + new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout())); this.retryPolicy = new RetryPolicy( resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } @@ -85,54 +86,11 @@ public Mono invoke(String actorType, String actorId, String methodName, .build(); return Mono.deferContextual( context -> this.createMono( - it -> intercept(context, this.timeoutPolicy, client).invokeActor(req, it) + it -> this.grpcInterceptors.intercept(client, context).invokeActor(req, it) ) ).map(r -> r.getData().toByteArray()); } - /** - * Populates GRPC client with interceptors. - * - * @param client GRPC client for Dapr. - * @return Client after adding interceptors. - */ - private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { - ClientInterceptor interceptor = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions options, - Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); - return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { - @Override - public void start(final Listener responseListener, final Metadata metadata) { - String daprApiToken = Properties.API_TOKEN.get(); - if (daprApiToken != null) { - metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken); - } - - super.start(responseListener, metadata); - } - }; - } - }; - return client.withInterceptors(interceptor); - } - - /** - * Populates GRPC client with interceptors for telemetry. - * - * @param context Reactor's context. - * @param timeoutPolicy Timeout policy for gRPC call. - * @param client GRPC client for Dapr. - * @return Client after adding interceptors. - */ - private static DaprGrpc.DaprStub intercept( - ContextView context, TimeoutPolicy timeoutPolicy, DaprGrpc.DaprStub client) { - return DaprClientGrpcInterceptors.intercept(client, timeoutPolicy, context); - } - private Mono createMono(Consumer> consumer) { return retryPolicy.apply( Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index 313eca5a7..0876a73cc 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -106,7 +106,7 @@ public void setup() throws IOException { InProcessChannelBuilder.forName(serverName).directExecutor().build()); // Create a HelloWorldClient using the in-process channel; - client = new DaprClientImpl(DaprGrpc.newStub(channel), null); + client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null); } @Test diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 8ea32b368..07a2a4b1e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -30,7 +30,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -68,19 +71,34 @@ public class DaprRun implements Stoppable { private final boolean hasAppHealthCheck; + private final Map, String> propertyOverrides; + private DaprRun(String testName, DaprPorts ports, String successMessage, Class serviceClass, int maxWaitMilliseconds, AppRun.AppProtocol appProtocol) { + this(testName, ports, successMessage, serviceClass, maxWaitMilliseconds, appProtocol, UUID.randomUUID().toString()); + } + + private DaprRun(String testName, + DaprPorts ports, + String successMessage, + Class serviceClass, + int maxWaitMilliseconds, + AppRun.AppProtocol appProtocol, + String daprApiToken) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName.toLowerCase() : String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase(); this.appProtocol = appProtocol; this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol)); + new Command( + successMessage, + buildDaprCommand(this.appName, serviceClass, ports, appProtocol), + Map.of("DAPR_API_TOKEN", daprApiToken)); this.listCommand = new Command( this.appName, "dapr list"); @@ -91,6 +109,9 @@ private DaprRun(String testName, this.maxWaitMilliseconds = maxWaitMilliseconds; this.started = new AtomicBoolean(false); this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass); + this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{ + put(Properties.API_TOKEN, daprApiToken); + }}); } public void start() throws InterruptedException, IOException { @@ -149,7 +170,7 @@ public void stop() throws InterruptedException, IOException { } public Map, String> getPropertyOverrides() { - return this.ports.getPropertyOverrides(); + return this.propertyOverrides; } public DaprClientBuilder newDaprClientBuilder() { @@ -239,17 +260,13 @@ public String getAppName() { public DaprClient newDaprClient() { return new DaprClientBuilder() - .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) - .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) - .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .withPropertyOverrides(this.getPropertyOverrides()) .build(); } public DaprPreviewClient newDaprPreviewClient() { return new DaprClientBuilder() - .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) - .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) - .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .withPropertyOverrides(this.getPropertyOverrides()) .buildPreviewClient(); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index ad67eed8c..964b28e1d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -173,6 +173,7 @@ private DaprClientImpl buildDaprClient() { daprHttp, this.objectSerializer, this.stateSerializer, - this.resiliencyOptions); + this.resiliencyOptions, + properties.getValue(Properties.API_TOKEN)); } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 9eccc633b..803488c0a 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -119,11 +119,6 @@ public class DaprClientImpl extends AbstractDaprClient { */ private final GrpcChannelFacade channel; - /** - * The timeout policy. - */ - private final TimeoutPolicy timeoutPolicy; - /** * The retry policy. */ @@ -141,9 +136,10 @@ public class DaprClientImpl extends AbstractDaprClient { */ private final DaprHttp httpClient; + private final DaprClientGrpcInterceptors grpcInterceptors; + /** - * Default access level constructor, in order to create an instance of this - * class use io.dapr.client.DaprClientBuilder + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * * @param channel Facade for the managed GRPC channel * @param asyncStub async gRPC stub @@ -157,7 +153,27 @@ public class DaprClientImpl extends AbstractDaprClient { DaprHttp httpClient, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { - this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null); + this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, null); + } + + /** + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param daprApiToken Dapr API Token. + * @see DaprClientBuilder + */ + DaprClientImpl( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprHttp httpClient, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + String daprApiToken) { + this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, daprApiToken); } /** @@ -169,6 +185,7 @@ public class DaprClientImpl extends AbstractDaprClient { * @param objectSerializer Serializer for transient request/response objects. * @param stateSerializer Serializer for state objects. * @param resiliencyOptions Client-level override for resiliency options. + * @param daprApiToken Dapr API Token. * @see DaprClientBuilder */ DaprClientImpl( @@ -177,15 +194,47 @@ public class DaprClientImpl extends AbstractDaprClient { DaprHttp httpClient, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer, - ResiliencyOptions resiliencyOptions) { + ResiliencyOptions resiliencyOptions, + String daprApiToken) { + this( + channel, + asyncStub, + httpClient, + objectSerializer, + stateSerializer, + new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()), + new RetryPolicy(resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()), + daprApiToken); + } + + /** + * Instantiates a new DaprClient. + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param httpClient client for http service invocation + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param timeoutPolicy Client-level timeout policy. + * @param retryPolicy Client-level retry policy. + * @param daprApiToken Dapr API Token. + * @see DaprClientBuilder + */ + private DaprClientImpl( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprHttp httpClient, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + TimeoutPolicy timeoutPolicy, + RetryPolicy retryPolicy, + String daprApiToken) { super(objectSerializer, stateSerializer); this.channel = channel; this.asyncStub = asyncStub; this.httpClient = httpClient; - this.timeoutPolicy = new TimeoutPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); - this.retryPolicy = new RetryPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); + this.retryPolicy = retryPolicy; + this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -215,7 +264,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State */ public > T newGrpcStub(String appId, Function stubBuilder) { // Adds Dapr interceptors to populate gRPC metadata automatically. - return DaprClientGrpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()), timeoutPolicy); + return this.grpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel())); } /** @@ -425,7 +474,8 @@ private Subscription buildSubscription( SubscriptionListener listener, TypeRef type, DaprProtos.SubscribeTopicEventsRequestAlpha1 request) { - Subscription subscription = new Subscription<>(this.asyncStub, request, listener, response -> { + var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); + Subscription subscription = new Subscription<>(interceptedStub, request, listener, response -> { if (response.getEventMessage() == null) { return null; } @@ -1268,7 +1318,7 @@ private ConfigurationItem buildConfigurationItem( * @return Client after adding interceptors. */ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { - return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context); + return this.grpcInterceptors.intercept(client, context); } /** @@ -1281,7 +1331,7 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien */ private DaprGrpc.DaprStub intercept( ContextView context, DaprGrpc.DaprStub client, Consumer metadataConsumer) { - return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer); + return this.grpcInterceptors.intercept(client, context, metadataConsumer); } private Mono createMono(Consumer> consumer) { diff --git a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java index 5241e1d5f..a4e5c2324 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java @@ -31,121 +31,82 @@ */ public class DaprClientGrpcInterceptors { - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked - * @param client gRPC client - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final String appId, final T client) { - return intercept(appId, client, null, null, null); - } + private final String daprApiToken; - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param client gRPC client - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final T client) { - return intercept(null, client, null, null, null); - } + private final TimeoutPolicy timeoutPolicy; - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked - * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept( - final String appId, final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(appId, client, timeoutPolicy, null, null); + public DaprClientGrpcInterceptors() { + this(null, null); } - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(null, client, timeoutPolicy, null, null); + public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) { + this.daprApiToken = daprApiToken; + this.timeoutPolicy = timeoutPolicy; } /** * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked * @param client gRPC client - * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept( - final String appId, final T client, final ContextView context) { - return intercept(appId, client, null, context, null); + public > T intercept(final T client) { + return intercept(null, client, null, null); } /** * Adds all Dapr interceptors to a gRPC async stub. + * @param appId Application ID to invoke. * @param client gRPC client - * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept(final T client, final ContextView context) { - return intercept(null, client, null, context, null); + public > T intercept( + final String appId, + final T client) { + return this.intercept(appId, client, null, null); } /** * Adds all Dapr interceptors to a gRPC async stub. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context) { - return intercept(null, client, timeoutPolicy, context, null); + return intercept(null, client, context, null); } /** * Adds all Dapr interceptors to a gRPC async stub. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param metadataConsumer Consumer of the gRPC metadata * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context, final Consumer metadataConsumer) { - return intercept(null, client, timeoutPolicy, context, metadataConsumer); + return this.intercept(null, client, context, metadataConsumer); } /** * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked + * @param appId Application ID to invoke. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param metadataConsumer Consumer of the gRPC metadata * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final String appId, final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context, final Consumer metadataConsumer) { if (client == null) { @@ -154,8 +115,8 @@ public static > T intercept( return client.withInterceptors( new DaprAppIdInterceptor(appId), - new DaprApiTokenInterceptor(), - new DaprTimeoutInterceptor(timeoutPolicy), + new DaprApiTokenInterceptor(this.daprApiToken), + new DaprTimeoutInterceptor(this.timeoutPolicy), new DaprTracingInterceptor(context), new DaprMetadataInterceptor(metadataConsumer)); } diff --git a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java index cda6e896b..f7cf415d8 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java @@ -24,10 +24,23 @@ import io.grpc.MethodDescriptor; /** - * Class to be used as part of your service's client stub interceptor to include Dapr tokens. + * Class to be used as part of your service's client stub interceptor to include the Dapr API token. */ public class DaprApiTokenInterceptor implements ClientInterceptor { + /** + * Dapr API Token. + */ + private final String token; + + /** + * Instantiates an interceptor to inject the Dapr API Token. + * @param token Dapr API Token. + */ + public DaprApiTokenInterceptor(String token) { + this.token = token; + } + @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, @@ -37,9 +50,10 @@ public ClientCall interceptCall( return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { - String daprApiToken = Properties.API_TOKEN.get(); - if (daprApiToken != null) { - metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); + if (DaprApiTokenInterceptor.this.token != null) { + metadata.put( + Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), + DaprApiTokenInterceptor.this.token); } super.start(responseListener, metadata); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index 4b8b80317..dc7ed58ed 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -238,7 +238,7 @@ private Mono invoke() { .build(); return Mono.deferContextual( context -> this.createMono( - it -> DaprClientGrpcInterceptors.intercept(daprStub, context).invokeService(req, it) + it -> new DaprClientGrpcInterceptors().intercept(daprStub, context).invokeService(req, it) ) ).then(); }