From ceded839d3c14d0cf187ed6bea9bdb0bfe16bfe1 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 11 Sep 2024 14:09:50 -0700 Subject: [PATCH] Bi-di subscription. Add bidi subscription to validate workflow. Signed-off-by: Artur Souza --- .github/workflows/validate.yml | 4 + README.md | 2 + examples/README.md | 541 ------------------ .../io/dapr/examples/pubsub/stream/README.md | 122 ++++ .../examples/pubsub/stream/Subscriber.java | 88 +++ .../java/io/dapr/examples/workflows/README.md | 57 +- pom.xml | 2 +- sdk-autogen/pom.xml | 1 + sdk-tests/pom.xml | 8 + .../src/test/java/io/dapr/it/DaprRun.java | 18 + .../dapr/it/pubsub/stream/PubSubStreamIT.java | 124 ++++ sdk/pom.xml | 6 + .../java/io/dapr/client/DaprClientImpl.java | 58 +- .../io/dapr/client/DaprPreviewClient.java | 12 + .../java/io/dapr/client/Subscription.java | 197 +++++++ .../io/dapr/client/SubscriptionListener.java | 45 ++ .../io/dapr/client/DaprClientBuilderTest.java | 4 +- .../client/DaprPreviewClientGrpcTest.java | 122 ++++ .../java/io/dapr/utils/NetworkUtilsTest.java | 42 +- 19 files changed, 840 insertions(+), 613 deletions(-) delete mode 100644 examples/README.md create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/stream/README.md create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java create mode 100644 sdk/src/main/java/io/dapr/client/Subscription.java create mode 100644 sdk/src/main/java/io/dapr/client/SubscriptionListener.java diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 58ea173f9..8d1e23187 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -160,3 +160,7 @@ jobs: working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/workflows/README.md + - name: Validate streaming subscription example + working-directory: ./examples + run: | + mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md diff --git a/README.md b/README.md index 71b47e3ba..a3f0dd593 100644 --- a/README.md +++ b/README.md @@ -102,8 +102,10 @@ Try the following examples to learn more about Dapr's Java SDK: * [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc) * [State management](./examples/src/main/java/io/dapr/examples/state) * [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/) +* [PubSub with streaming subscription](./examples/src/main/java/io/dapr/examples/pubsub/stream/) * [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http) * [Actors](./examples/src/main/java/io/dapr/examples/actors/) +* [Workflows](./examples/src/main/java/io/dapr/examples/workflows/) * [Secrets management](./examples/src/main/java/io/dapr/examples/secrets) * [Configuration](./examples/src/main/java/io/dapr/examples/configuration) * [Distributed tracing with OpenTelemetry SDK](./examples/src/main/java/io/dapr/examples/tracing) diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index fedee965e..000000000 --- a/examples/README.md +++ /dev/null @@ -1,541 +0,0 @@ -# Dapr Workflow Sample - -In this example, we'll use Dapr to test workflow features. - -Visit [the Workflow documentation landing page](https://docs.dapr.io/developing-applications/building-blocks/workflow) for more information. - -This example contains the follow classes: - -* DemoWorkflow: An example of a Dapr Workflow. -* DemoWorkflowClient: This application will start workflows using Dapr. -* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance. - -## Pre-requisites - -* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/). -* Java JDK 11 (or greater): - * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) - * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) - * [OpenJDK 11](https://jdk.java.net/11/) -* [Apache Maven](https://maven.apache.org/install.html) version 3.x. - -### Checking out the code - -Clone this repository: - -```sh -git clone https://github.com/dapr/java-sdk.git -cd java-sdk -``` - -Then build the Maven project: - -```sh -# make sure you are in the `java-sdk` directory. -mvn install -``` - -Get into the `examples` directory. -```sh -cd examples -``` - -### Initialize Dapr - -Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. - -## Patterns - -Those examples contain the following workflow patterns: -1. [Chaining Pattern](#chaining-pattern) -2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern) -3. [Continue As New Pattern](#continue-as-new-pattern) -4. [External Event Pattern](#external-event-pattern) -5. [Sub-workflow Pattern](#sub-workflow-pattern) - -### Chaining Pattern -In the chaining pattern, a sequence of activities executes in a specific order. -In this pattern, the output of one activity is applied to the input of another activity. -The chaining pattern is useful when you need to execute a sequence of activities in a specific order. - -The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below: -```java -public class DemoChainWorker { - /** - * The main method of this app. - * - * @param args The port the app will listen on. - * @throws Exception An Exception. - */ - public static void main(String[] args) throws Exception { - // Register the Workflow with the builder. - WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class); - builder.registerActivity(ToUpperCaseActivity.class); - - // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } - - System.exit(0); - } -} -``` - -The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below: -```java -public class DemoChainWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - - String result = ""; - result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", "; - result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", "; - result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await(); - - ctx.getLogger().info("Workflow finished with result: " + result); - ctx.complete(result); - }; - } -} -``` - -The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below: -```java -public class ToUpperCaseActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); - logger.info("Starting Chaining Activity: " + ctx.getName()); - - var message = ctx.getInput(String.class); - var newMessage = message.toUpperCase(); - - logger.info("Message Received from input: " + message); - logger.info("Sending message to output: " + newMessage); - - logger.info("Activity returned: " + newMessage); - logger.info("Activity finished"); - - return newMessage; - } -} -``` - -Execute the following script in order to run DemoChainWorker: -```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker -``` - -Once running, the logs will start displaying the different steps: First, you can see workflow is starting: -```text -== APP == Start workflow runtime -== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock -== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. -``` - -Then, execute the following script in order to run DemoChainClient: -```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient -``` - - - - -Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity: -```text -== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow -== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity -== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo -== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO -== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity -== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London -== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON -== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity -== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle -== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE -== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE -``` -and the client logs showing the workflow is started and finished with expected result: -```text -Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 -workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE -``` - -### Fan-out/Fan-in Pattern - -In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities. - -The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below: -```java -public class DemoFanInOutWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - - - // The input is a list of objects that need to be operated on. - // In this example, inputs are expected to be strings. - List inputs = ctx.getInput(List.class); - - // Fan-out to multiple concurrent activity invocations, each of which does a word count. - List> tasks = inputs.stream() - .map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class)) - .collect(Collectors.toList()); - - // Fan-in to get the total word count from all the individual activity results. - List allWordCountResults = ctx.allOf(tasks).await(); - int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum(); - - ctx.getLogger().info("Workflow finished with result: " + totalWordCount); - // Save the final result as the orchestration output. - ctx.complete(totalWordCount); - }; - } -} -``` - -The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below: -```java -public class CountWordsActivity implements WorkflowActivity { - @Override - public Object run(WorkflowActivityContext ctx) { - Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); - logger.info("Starting Activity: " + ctx.getName()); - - String input = ctx.getInput(String.class); - StringTokenizer tokenizer = new StringTokenizer(input); - int result = tokenizer.countTokens(); - - logger.info("Activity returned: " + result); - logger.info("Activity finished"); - - return result; - } -} -``` - - -Execute the following script in order to run DemoFanInOutWorker: -```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker -``` - -Execute the following script in order to run DemoFanInOutClient: - -```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient -``` - - -Now you can see the logs from worker: -```text -== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow -== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity -== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2 -== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished -== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity -== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9 -== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished -== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity -== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21 -== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished -== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity -== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17 -== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished -== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity -== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11 -== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished -== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60 -``` - -and the client: -```text -Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986 -workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60 -``` - -### Continue As New Pattern -`ContinueAsNew` API allows you to restart the workflow with a new input. - -The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below: -```java -public class DemoContinueAsNewWorkflow extends Workflow { - /* - Compared with a CRON schedule, this periodic workflow example will never overlap. - For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc. - and could potentially run into overlap issues if the cleanup takes longer than an hour. - In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups, - then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap. - */ - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - - ctx.getLogger().info("call CleanUpActivity to do the clean up"); - ctx.callActivity(CleanUpActivity.class.getName()).await(); - ctx.getLogger().info("CleanUpActivity finished"); - - ctx.getLogger().info("wait 10 seconds for next clean up"); - ctx.createTimer(Duration.ofSeconds(10)).await(); - - // continue the workflow. - ctx.continueAsNew(null); - }; - } -} -``` - -The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below: -```java -public class CleanUpActivity implements WorkflowActivity { - @Override - public Object run(WorkflowActivityContext ctx) { - Logger logger = LoggerFactory.getLogger(CleanUpActivity.class); - logger.info("Starting Activity: " + ctx.getName()); - - logger.info("start clean up work, it may take few seconds to finish..."); - - //Sleeping for 5 seconds to simulate long running operation - try { - TimeUnit.SECONDS.sleep(5); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return "clean up finish."; - } -} -``` - -Once you start the workflow and client using the following commands: -```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker -``` -```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient -```` - -You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished: -```text -== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow -== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up -== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity -== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... -== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow -== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up -== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished -== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up -== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity -== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... -== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished -== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up -== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow -== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up -== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity -== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... -== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow -== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up -== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished -== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up -... -``` - -and the client: -```text -Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94 -``` -It will continue to run until you stop the worker. - -### External Event Pattern -In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing. - -The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below: -```java -public class DemoExternalEventWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - - Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await(); - if (approved) { - ctx.getLogger().info("approval granted - do the approved action"); - ctx.callActivity(ApproveActivity.class.getName()).await(); - ctx.getLogger().info("approval-activity finished"); - } else { - ctx.getLogger().info("approval denied - send a notification"); - ctx.callActivity(DenyActivity.class.getName()).await(); - ctx.getLogger().info("denied-activity finished"); - } - }; - } -} -``` - -In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity. -```java -client.raiseEvent(instanceId, "Approval", true); -``` - -Start the workflow and client using the following commands: - -ex -```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker -``` - -```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient -``` - -The worker logs: -```text -== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow -== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval... -== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action -== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity -== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity... -== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished -``` - -The client log: -```text -Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 -workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. -``` - -### Sub-workflow Pattern -The sub-workflow pattern allows you to call a workflow from another workflow. - -The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below: -```java -public class DemoWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - - var subWorkflowInput = "Hello Dapr Workflow!"; - ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput); - - var subWorkflowOutput = - ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await(); - - ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput); - }; - } -} -``` - -The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below: -```java -public class DemoSubWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName()); - - var subWorkflowInput = ctx.getInput(String.class); - ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput); - - ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName()); - String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await(); - - ctx.getLogger().info("SubWorkflow finished with: " + result); - ctx.complete(result); - }; - } -} -``` - -The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below: -```java -public class ReverseActivity implements WorkflowActivity { - @Override - public Object run(WorkflowActivityContext ctx) { - Logger logger = LoggerFactory.getLogger(ReverseActivity.class); - logger.info("Starting Activity: " + ctx.getName()); - - var message = ctx.getInput(String.class); - var newMessage = new StringBuilder(message).reverse().toString(); - - logger.info("Message Received from input: " + message); - logger.info("Sending message to output: " + newMessage); - - logger.info("Activity returned: " + newMessage); - logger.info("Activity finished"); - - return newMessage; - } -} -``` - -Start the workflow and client using the following commands: - -ex -```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker -``` - -```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient -``` - -The log from worker: -```text -== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow -== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow! -== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow -== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow! -== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity -== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity -== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow! -== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH -== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH -== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH -``` - -The log from client: -```text -Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb -workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH -``` \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md new file mode 100644 index 000000000..67f6ffcbb --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -0,0 +1,122 @@ +# Dapr Streaming Subscription Sample + +In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture. + +Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub. + +## Pub-Sub Sample using the Java-SDK + +This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5). +## Pre-requisites + +* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/). +* Java JDK 11 (or greater): + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Checking out the code + +Clone this repository: + +```sh +git clone https://github.com/dapr/java-sdk.git +cd java-sdk +``` + +Then build the Maven project: + +```sh +# make sure you are in the `java-sdk` directory. +mvn install +``` + +Then get into the examples directory: + +```sh +cd examples +``` +### Initialize Dapr + +Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. + +### Running the subscriber + +The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface. + + + +The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. + +In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription. + +```java +public class Subscriber { + + // ... + + public static void main(String[] args) throws Exception { + String topicName = getTopicName(args); + try (var client = new DaprClientBuilder().buildPreviewClient()) { + var subscription = client.subscribeToEvents( + PUBSUB_NAME, + topicName, + new SubscriptionListener<>() { + + @Override + public Status onEvent(CloudEvent event) { + System.out.println("Subscriber got: " + event.getData()); + return Status.SUCCESS; + } + + @Override + public void onError(RuntimeException exception) { + System.out.println("Subscriber got exception: " + exception.getMessage()); + } + }, + TypeRef.STRING); + + subscription.awaitTermination(); + } + } + + // ... +} +``` + +Execute the following command to run the Subscriber example: + + + +```bash +dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber +``` + + + +Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side: + + + +```bash +dapr run --resources-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher +``` + + + + diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java new file mode 100644 index 000000000..508b85d74 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java @@ -0,0 +1,88 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.pubsub.stream; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.SubscriptionListener; +import io.dapr.client.domain.CloudEvent; +import io.dapr.examples.DaprApplication; +import io.dapr.utils.TypeRef; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + +/** + * Subscriber using bi-directional gRPC streaming, which does not require an app port. + * 1. Build and install jars: + * mvn clean install + * 2. cd [repo root]/examples + * 3. Run the subscriber: + * dapr run --resources-path ./components/pubsub --app-id subscriber -- \ + * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber + */ +public class Subscriber { + + //The title of the topic to be used for publishing + private static final String DEFAULT_TOPIC_NAME = "testingtopic"; + + //The name of the pubsub + private static final String PUBSUB_NAME = "messagebus"; + + /** + * This is the entry point for this example app, which subscribes to a topic. + * @param args Used to optionally pass a topic name. + * @throws Exception An Exception on startup. + */ + public static void main(String[] args) throws Exception { + String topicName = getTopicName(args); + try (var client = new DaprClientBuilder().buildPreviewClient()) { + var subscription = client.subscribeToEvents( + PUBSUB_NAME, + topicName, + new SubscriptionListener<>() { + + @Override + public Status onEvent(CloudEvent event) { + System.out.println("Subscriber got: " + event.getData()); + return Status.SUCCESS; + } + + @Override + public void onError(RuntimeException exception) { + System.out.println("Subscriber got exception: " + exception.getMessage()); + } + }, + TypeRef.STRING); + + subscription.awaitTermination(); + } + } + + /** + * If a topic is specified in args, use that. + * Else, fallback to the default topic. + * @param args program arguments + * @return name of the topic to publish messages to. + */ + private static String getTopicName(String[] args) { + if (args.length >= 1) { + return args[0]; + } + return DEFAULT_TOPIC_NAME; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index 0dac908e3..7dcc0b65d 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -9,14 +9,14 @@ This example contains the follow classes: * DemoWorkflow: An example of a Dapr Workflow. * DemoWorkflowClient: This application will start workflows using Dapr. * DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance. - + ## Pre-requisites * [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/). * Java JDK 11 (or greater): - * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) - * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) - * [OpenJDK 11](https://jdk.java.net/11/) + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) * [Apache Maven](https://maven.apache.org/install.html) version 3.x. ### Checking out the code @@ -54,8 +54,8 @@ Those examples contain the following workflow patterns: 5. [Sub-workflow Pattern](#sub-workflow-pattern) ### Chaining Pattern -In the chaining pattern, a sequence of activities executes in a specific order. -In this pattern, the output of one activity is applied to the input of another activity. +In the chaining pattern, a sequence of activities executes in a specific order. +In this pattern, the output of one activity is applied to the input of another activity. The chaining pattern is useful when you need to execute a sequence of activities in a specific order. The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below: @@ -149,6 +149,7 @@ Execute the following script in order to run DemoChainWorker: ```sh dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker ``` + Once running, the logs will start displaying the different steps: First, you can see workflow is starting: ```text == APP == Start workflow runtime @@ -162,6 +163,8 @@ java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chai ``` + + Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity: ```text == APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow @@ -237,7 +240,7 @@ public class CountWordsActivity implements WorkflowActivity { } ``` +ex ```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker +dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker ``` ```sh java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient ``` - The worker logs: ```text @@ -521,25 +511,14 @@ public class ReverseActivity implements WorkflowActivity { Start the workflow and client using the following commands: - +ex ```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker +dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker ``` ```sh java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient ``` - The log from worker: ```text diff --git a/pom.xml b/pom.xml index 17573f976..1eb230961 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.64.0 3.25.0 protoc - https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto + https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto 1.13.0-SNAPSHOT 0.13.0-SNAPSHOT 1.7.1 diff --git a/sdk-autogen/pom.xml b/sdk-autogen/pom.xml index a159b6b79..5d65d2e10 100644 --- a/sdk-autogen/pom.xml +++ b/sdk-autogen/pom.xml @@ -17,6 +17,7 @@ Auto-generated SDK for Dapr + 17 ${project.build.directory}/generated-sources ${project.build.directory}/proto false diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index ce60ce241..6ead6c307 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -17,6 +17,8 @@ UTF-8 + 17 + 17 17 17 true @@ -145,6 +147,12 @@ ${dapr.sdk.version} test + + io.dapr + dapr-sdk-springboot + ${dapr.sdk.version} + test + io.dapr dapr-sdk-workflows diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index fce5ac976..8ea32b368 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -15,7 +15,9 @@ import com.google.protobuf.Empty; import io.dapr.actors.client.ActorClient; +import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.config.Property; @@ -235,6 +237,22 @@ public String getAppName() { return appName; } + public DaprClient newDaprClient() { + return new DaprClientBuilder() + .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) + .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) + .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .build(); + } + + public DaprPreviewClient newDaprPreviewClient() { + return new DaprClientBuilder() + .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) + .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) + .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .buildPreviewClient(); + } + public void checkRunState(long timeout, boolean shouldBeRunning) throws InterruptedException { callWithRetry(() -> { try { diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java new file mode 100644 index 000000000..f737edc85 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java @@ -0,0 +1,124 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.pubsub.stream; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.SubscriptionListener; +import io.dapr.client.domain.CloudEvent; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.utils.TypeRef; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import static io.dapr.it.Retry.callWithRetry; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class PubSubStreamIT extends BaseIT { + + // Must be a large enough number, so we validate that we get more than the initial batch + // sent by the runtime. When this was first added, the batch size in runtime was set to 10. + private static final int NUM_MESSAGES = 100; + private static final String TOPIC_NAME = "stream-topic"; + private static final String PUBSUB_NAME = "messagebus"; + + private final List runs = new ArrayList<>(); + + private DaprRun closeLater(DaprRun run) { + this.runs.add(run); + return run; + } + + @AfterEach + public void tearDown() throws Exception { + for (DaprRun run : runs) { + run.stop(); + } + } + + @Test + public void testPubSub() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName(), + 60000)); + + var runId = UUID.randomUUID().toString(); + try (DaprClient client = daprRun.newDaprClient(); + DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s for run %s", i, TOPIC_NAME, runId); + //Publishing messages + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); + System.out.println( + String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME)); + } + + System.out.println("Starting subscription for " + TOPIC_NAME); + + Set messages = Collections.synchronizedSet(new HashSet<>()); + Set errors = Collections.synchronizedSet(new HashSet<>()); + + var random = new Random(37); // predictable random. + var listener = new SubscriptionListener() { + @Override + public Status onEvent(CloudEvent event) { + // Useful to avoid false negatives running locally multiple times. + if (event.getData().contains(runId)) { + // 5% failure rate. + var decision = random.nextInt(100); + if (decision < 5) { + if (decision % 2 == 0) { + throw new RuntimeException("artificial exception on message " + event.getId()); + } + return Status.RETRY; + } + + messages.add(event.getId()); + return Status.SUCCESS; + } + + return Status.DROP; + } + + @Override + public void onError(RuntimeException exception) { + errors.add(exception.getMessage()); + } + + }; + try(var subscription = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, listener, TypeRef.STRING)) { + callWithRetry(() -> { + var messageCount = messages.size(); + System.out.println( + String.format("Got %d messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); + assertEquals(NUM_MESSAGES, messages.size()); + assertEquals(4, errors.size()); + }, 120000); // Time for runtime to retry messages. + + subscription.close(); + subscription.awaitTermination(); + } + } + } +} diff --git a/sdk/pom.xml b/sdk/pom.xml index efa79cd6c..6548c6ae3 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -137,6 +137,12 @@ grpc-inprocess test + + io.dapr + dapr-sdk-autogen + 1.13.0-SNAPSHOT + compile + diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index dc0f75210..9eccc633b 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -23,6 +23,7 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.ComponentMetadata; import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.DaprMetadata; @@ -75,11 +76,11 @@ import io.dapr.v1.DaprProtos.PubsubSubscription; import io.dapr.v1.DaprProtos.PubsubSubscriptionRule; import io.dapr.v1.DaprProtos.RegisteredComponents; -import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.Metadata; import io.grpc.stub.AbstractStub; import io.grpc.stub.StreamObserver; +import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -141,7 +142,7 @@ public class DaprClientImpl extends AbstractDaprClient { private final DaprHttp httpClient; /** - * Default access level constructor, in order to create an instance of this + * Default access level constructor, in order to create an instance of this * class use io.dapr.client.DaprClientBuilder * * @param channel Facade for the managed GRPC channel @@ -401,6 +402,59 @@ public Mono> publishEvents(BulkPublishRequest requ } } + /** + * {@inheritDoc} + */ + @Override + public Subscription subscribeToEvents( + String pubsubName, String topic, SubscriptionListener listener, TypeRef type) { + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() + .setTopic(topic) + .setPubsubName(pubsubName) + .build(); + DaprProtos.SubscribeTopicEventsRequestAlpha1 request = + DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setInitialRequest(initialRequest) + .build(); + return buildSubscription(listener, type, request); + } + + @NotNull + private Subscription buildSubscription( + SubscriptionListener listener, + TypeRef type, + DaprProtos.SubscribeTopicEventsRequestAlpha1 request) { + Subscription subscription = new Subscription<>(this.asyncStub, request, listener, response -> { + if (response.getEventMessage() == null) { + return null; + } + + var message = response.getEventMessage(); + if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) { + return null; + } + + try { + CloudEvent cloudEvent = new CloudEvent<>(); + var object = + DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type); + cloudEvent.setData(object); + cloudEvent.setDatacontenttype(message.getDataContentType()); + cloudEvent.setId(message.getId()); + cloudEvent.setTopic(message.getTopic()); + cloudEvent.setSpecversion(message.getSpecVersion()); + cloudEvent.setType(message.getType()); + cloudEvent.setPubsubName(message.getPubsubName()); + return cloudEvent; + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + subscription.start(); + return subscription; + } + @Override public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { try { diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 0e10ea7fd..95911efc2 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -256,4 +256,16 @@ Mono> publishEvents(String pubsubName, String topicNa * @return Unlock result */ Mono unlock(UnlockRequest request); + + /** + * Subscribe to pubsub via streaming. + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param listener Callback methods to process events. + * @param type Type for object deserialization. + * @return An active subscription. + * @param Type of object deserialization. + */ + Subscription subscribeToEvents( + String pubsubName, String topic, SubscriptionListener listener, TypeRef type); } diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java new file mode 100644 index 000000000..c38de7f4a --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/Subscription.java @@ -0,0 +1,197 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client; + +import io.dapr.client.domain.CloudEvent; +import io.dapr.exceptions.DaprException; +import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +/** + * Streaming subscription of events for Dapr's pubsub. + * @param Application's object type. + */ +public class Subscription implements Closeable { + + private final BlockingQueue ackQueue = new LinkedBlockingQueue<>(50); + + private final AtomicBoolean running = new AtomicBoolean(true); + + private final Semaphore receiverStateChange = new Semaphore(0); + + private Thread acker; + + private Thread receiver; + + Subscription(DaprGrpc.DaprStub asyncStub, + DaprProtos.SubscribeTopicEventsRequestAlpha1 request, + SubscriptionListener listener, + Function> cloudEventConverter) { + final AtomicReference> streamRef = + new AtomicReference<>(); + + this.acker = new Thread(() -> { + while (running.get()) { + try { + var ackResponse = ackQueue.take(); + if (ackResponse == null) { + continue; + } + + var stream = streamRef.get(); + if (stream == null) { + Thread.sleep(1000); + // stream not ready yet + continue; + } + + stream.onNext(ackResponse); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // No-op and continue after waiting for some time. + // This is useful when there is a reconnection, for example. + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + } + } + }); + + this.receiver = new Thread(() -> { + while (running.get()) { + var stream = asyncStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { + @Override + public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 topicEventRequest) { + try { + var stream = streamRef.get(); + if (stream == null) { + throw new RuntimeException("Cannot receive event: streaming subscription is not initialized."); + } + + CloudEvent cloudEvent = cloudEventConverter.apply(topicEventRequest); + if (cloudEvent == null) { + return; + } + + var id = cloudEvent.getId(); + if ((id == null) || id.isEmpty()) { + return; + } + + var status = onEvent(listener, cloudEvent); + if (status == null) { + status = SubscriptionListener.Status.RETRY; + } + + var ack = buildAckRequest(id, status); + ackQueue.put(ack); + } catch (Exception e) { + this.onError(DaprException.propagate(e)); + } + } + + @Override + public void onError(Throwable throwable) { + listener.onError(DaprException.propagate(throwable)); + } + + @Override + public void onCompleted() { + receiverStateChange.release(); + } + }); + + streamRef.set(stream); + stream.onNext(request); + + // Keep the client running + try { + receiverStateChange.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + running.set(false); + } + } + }); + } + + private static SubscriptionListener.Status onEvent(SubscriptionListener listener, CloudEvent cloudEvent) { + try { + return listener.onEvent(cloudEvent); + } catch (Exception e) { + listener.onError(DaprException.propagate(e)); + return SubscriptionListener.Status.RETRY; + } + } + + @NotNull + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( + String id, SubscriptionListener.Status status) { + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder() + .setId(id) + .setStatus( + DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf( + status.name())) + .build()) + .build(); + DaprProtos.SubscribeTopicEventsRequestAlpha1 ack = + DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setEventProcessed(eventProcessed) + .build(); + return ack; + } + + void start() { + this.receiver.start(); + this.acker.start(); + } + + /** + * Stops the subscription. + */ + @Override + public void close() { + running.set(false); + receiverStateChange.release(); + this.acker.interrupt(); + } + + /** + * Awaits (blocks) for subscription to end. + * @throws InterruptedException Exception if interrupted while awaiting. + */ + public void awaitTermination() throws InterruptedException { + this.receiver.join(); + this.acker.join(); + } +} diff --git a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java new file mode 100644 index 000000000..fe2f9bbc5 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client; + +import io.dapr.client.domain.CloudEvent; + +/** + * Callback interface to receive events from a streaming subscription of events. + * @param Object type for deserialization. + */ +public interface SubscriptionListener { + + /** + * Callback status response for acknowledging a message. + */ + enum Status { + SUCCESS, + RETRY, + DROP + } + + /** + * Processes an event from streaming subscription. + * @param event Event received. + * @return Acknowledgement status. + */ + Status onEvent(CloudEvent event); + + /** + * Processes an exception during streaming subscription. + * @param exception Exception to be processed. + */ + void onError(RuntimeException exception); +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientBuilderTest.java b/sdk/src/test/java/io/dapr/client/DaprClientBuilderTest.java index dbc8ffae1..f5116e36a 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientBuilderTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientBuilderTest.java @@ -42,11 +42,11 @@ public void build() { @Test public void buildWithOverrideSidecarIP() { DaprClientBuilder daprClientBuilder = new DaprClientBuilder(); - daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknown-host"); + daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknownhost"); DaprClient daprClient = daprClientBuilder.build(); assertNotNull(daprClient); DaprException thrown = assertThrows(DaprException.class, () -> { daprClient.getMetadata().block(); }); - assertTrue(thrown.toString().contains("UNAVAILABLE")); + assertTrue(thrown.toString().contains("UNAVAILABLE"), thrown.toString()); } diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 200b4cae6..f6ba48a0c 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -20,6 +20,7 @@ import io.dapr.client.domain.BulkPublishEntry; import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; @@ -27,6 +28,8 @@ import io.dapr.client.domain.query.Query; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprAppCallbackProtos; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.Status; @@ -44,9 +47,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -417,6 +425,120 @@ public void unLock() { assertEquals(UnlockResponseStatus.SUCCESS, result); } + @Test + public void subscribeEventTest() throws Exception { + var numEvents = 100; + var numErrors = 3; + var numDrops = 2; + + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + for (int i = 0; i < numEvents; i++) { + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build()); + } + + for (int i = 0; i < numDrops; i++) { + // Bad messages + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(UUID.randomUUID().toString()) + .setPubsubName("bad pubsub") + .setTopic("bad topic") + .setData(ByteString.copyFromUtf8("\"\"")) + .setDataContentType("application/json") + .build()) + .build()); + } + observer.onCompleted(); + }); + emitterThread.start(); + return new StreamObserver<>() { + + @Override + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final Set success = Collections.synchronizedSet(new HashSet<>()); + final Set errors = Collections.synchronizedSet(new HashSet<>()); + final AtomicInteger dropCounter = new AtomicInteger(); + final Semaphore gotAll = new Semaphore(0); + + final AtomicInteger errorsToBeEmitted = new AtomicInteger(numErrors); + + var subscription = previewClient.subscribeToEvents( + "pubsubname", + "topic", + new SubscriptionListener<>() { + @Override + public Status onEvent(CloudEvent event) { + if (event.getPubsubName().equals(pubsubName) && + event.getTopic().equals(topicName) && + event.getData().equals(data)) { + + // Simulate an error + if ((success.size() == 4 /* some random entry */) && errorsToBeEmitted.decrementAndGet() >= 0) { + throw new RuntimeException("simulated exception on event " + event.getId()); + } + + success.add(event.getId()); + if (success.size() >= numEvents) { + gotAll.release(); + } + return Status.SUCCESS; + } + + dropCounter.incrementAndGet(); + return Status.DROP; + } + + @Override + public void onError(RuntimeException exception) { + errors.add(exception.getMessage()); + } + + }, + TypeRef.STRING); + + gotAll.acquire(); + subscription.close(); + + assertEquals(numEvents, success.size()); + assertEquals(numDrops, dropCounter.get()); + assertEquals(numErrors, errors.size()); + } private DaprProtos.QueryStateResponse buildQueryStateResponse(List> resp,String token) throws JsonProcessingException { List items = new ArrayList<>(); diff --git a/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java b/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java index d90697f28..f044cd728 100644 --- a/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java +++ b/sdk/src/test/java/io/dapr/utils/NetworkUtilsTest.java @@ -5,29 +5,15 @@ import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.HashMap; import java.util.Map; public class NetworkUtilsTest { - private final int defaultGrpcPort = 4000; + private final int defaultGrpcPort = 50001; private final String defaultSidecarIP = "127.0.0.1"; - private ManagedChannel channel; - private Map propertiesOverride; - - @BeforeEach - public void setUp() { - // Must be mutable for some test scenarios here. - propertiesOverride = new HashMap<>(Map.of( - Properties.GRPC_PORT.getName(), Integer.toString(defaultGrpcPort), - Properties.SIDECAR_IP.getName(), defaultSidecarIP, - Properties.GRPC_ENDPOINT.getName(), "" - )); - } @AfterEach public void tearDown() { @@ -38,7 +24,7 @@ public void tearDown() { @Test public void testBuildGrpcManagedChannel() { - channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride)); + channel = NetworkUtils.buildGrpcManagedChannel(new Properties()); String expectedAuthority = String.format("%s:%s", defaultSidecarIP, defaultGrpcPort); Assertions.assertEquals(expectedAuthority, channel.authority()); @@ -46,8 +32,8 @@ public void testBuildGrpcManagedChannel() { @Test public void testBuildGrpcManagedChannel_httpEndpointNoPort() { - propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "http://example.com"); - channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "http://example.com")); + channel = NetworkUtils.buildGrpcManagedChannel(properties); String expectedAuthority = "example.com:80"; Assertions.assertEquals(expectedAuthority, channel.authority()); @@ -55,8 +41,8 @@ public void testBuildGrpcManagedChannel_httpEndpointNoPort() { @Test public void testBuildGrpcManagedChannel_httpEndpointWithPort() { - propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000"); - channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000")); + channel = NetworkUtils.buildGrpcManagedChannel(properties); String expectedAuthority = "example.com:3000"; Assertions.assertEquals(expectedAuthority, channel.authority()); @@ -64,8 +50,8 @@ public void testBuildGrpcManagedChannel_httpEndpointWithPort() { @Test public void testBuildGrpcManagedChannel_httpsEndpointNoPort() { - propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "https://example.com"); - channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "https://example.com")); + channel = NetworkUtils.buildGrpcManagedChannel(properties); String expectedAuthority = "example.com:443"; Assertions.assertEquals(expectedAuthority, channel.authority()); @@ -73,8 +59,8 @@ public void testBuildGrpcManagedChannel_httpsEndpointNoPort() { @Test public void testBuildGrpcManagedChannel_httpsEndpointWithPort() { - propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000"); - channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000")); + channel = NetworkUtils.buildGrpcManagedChannel(properties); String expectedAuthority = "example.com:3000"; Assertions.assertEquals(expectedAuthority, channel.authority()); @@ -144,8 +130,8 @@ private static void testGrpcEndpointParsingScenario( String expectedEndpoint, boolean expectSecure ) { - var override = Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue); - var settings = NetworkUtils.GrpcEndpointSettings.parse(new Properties(override)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue)); + var settings = NetworkUtils.GrpcEndpointSettings.parse(properties); Assertions.assertEquals(expectedEndpoint, settings.endpoint); Assertions.assertEquals(expectSecure, settings.secure); @@ -153,8 +139,8 @@ private static void testGrpcEndpointParsingScenario( private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) { try { - var override = Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue); - NetworkUtils.GrpcEndpointSettings.parse(new Properties(override)); + var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue)); + NetworkUtils.GrpcEndpointSettings.parse(properties); Assert.fail(); } catch (IllegalArgumentException e) { // Expected