Skip to content

Commit ceded83

Browse files
committed
Bi-di subscription.
Add bidi subscription to validate workflow. Signed-off-by: Artur Souza <asouza.pro@gmail.com>
1 parent 7490434 commit ceded83

File tree

19 files changed

+840
-613
lines changed

19 files changed

+840
-613
lines changed

.github/workflows/validate.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,7 @@ jobs:
160160
working-directory: ./examples
161161
run: |
162162
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
163+
- name: Validate streaming subscription example
164+
working-directory: ./examples
165+
run: |
166+
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ Try the following examples to learn more about Dapr's Java SDK:
102102
* [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc)
103103
* [State management](./examples/src/main/java/io/dapr/examples/state)
104104
* [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/)
105+
* [PubSub with streaming subscription](./examples/src/main/java/io/dapr/examples/pubsub/stream/)
105106
* [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http)
106107
* [Actors](./examples/src/main/java/io/dapr/examples/actors/)
108+
* [Workflows](./examples/src/main/java/io/dapr/examples/workflows/)
107109
* [Secrets management](./examples/src/main/java/io/dapr/examples/secrets)
108110
* [Configuration](./examples/src/main/java/io/dapr/examples/configuration)
109111
* [Distributed tracing with OpenTelemetry SDK](./examples/src/main/java/io/dapr/examples/tracing)

examples/README.md

Lines changed: 0 additions & 541 deletions
This file was deleted.
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Dapr Streaming Subscription Sample
2+
3+
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture.
4+
5+
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.
6+
7+
## Pub-Sub Sample using the Java-SDK
8+
9+
This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5).
10+
## Pre-requisites
11+
12+
* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
13+
* Java JDK 11 (or greater):
14+
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
15+
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
16+
* [OpenJDK 11](https://jdk.java.net/11/)
17+
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
18+
19+
### Checking out the code
20+
21+
Clone this repository:
22+
23+
```sh
24+
git clone https://github.com/dapr/java-sdk.git
25+
cd java-sdk
26+
```
27+
28+
Then build the Maven project:
29+
30+
```sh
31+
# make sure you are in the `java-sdk` directory.
32+
mvn install
33+
```
34+
35+
Then get into the examples directory:
36+
37+
```sh
38+
cd examples
39+
```
40+
### Initialize Dapr
41+
42+
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
43+
44+
### Running the subscriber
45+
46+
The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface.
47+
48+
49+
50+
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
51+
52+
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
53+
54+
```java
55+
public class Subscriber {
56+
57+
// ...
58+
59+
public static void main(String[] args) throws Exception {
60+
String topicName = getTopicName(args);
61+
try (var client = new DaprClientBuilder().buildPreviewClient()) {
62+
var subscription = client.subscribeToEvents(
63+
PUBSUB_NAME,
64+
topicName,
65+
new SubscriptionListener<>() {
66+
67+
@Override
68+
public Status onEvent(CloudEvent<String> event) {
69+
System.out.println("Subscriber got: " + event.getData());
70+
return Status.SUCCESS;
71+
}
72+
73+
@Override
74+
public void onError(RuntimeException exception) {
75+
System.out.println("Subscriber got exception: " + exception.getMessage());
76+
}
77+
},
78+
TypeRef.STRING);
79+
80+
subscription.awaitTermination();
81+
}
82+
}
83+
84+
// ...
85+
}
86+
```
87+
88+
Execute the following command to run the Subscriber example:
89+
90+
<!-- STEP
91+
name: Run Subscriber
92+
expected_stdout_lines:
93+
- '== APP == Subscriber got: This is message #0'
94+
- '== APP == Subscriber got: This is message #1'
95+
background: true
96+
sleep: 30
97+
-->
98+
99+
```bash
100+
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
101+
```
102+
103+
<!-- END_STEP -->
104+
105+
Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:
106+
107+
<!-- STEP
108+
name: Run Publisher
109+
expected_stdout_lines:
110+
- '== APP == Published message: This is message #0'
111+
- '== APP == Published message: This is message #1'
112+
background: true
113+
sleep: 15
114+
-->
115+
116+
```bash
117+
dapr run --resources-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
118+
```
119+
120+
<!-- END_STEP -->
121+
122+
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.pubsub.stream;
15+
16+
import io.dapr.client.DaprClientBuilder;
17+
import io.dapr.client.DaprPreviewClient;
18+
import io.dapr.client.SubscriptionListener;
19+
import io.dapr.client.domain.CloudEvent;
20+
import io.dapr.examples.DaprApplication;
21+
import io.dapr.utils.TypeRef;
22+
import io.grpc.Server;
23+
import io.grpc.ServerBuilder;
24+
import org.apache.commons.cli.CommandLine;
25+
import org.apache.commons.cli.CommandLineParser;
26+
import org.apache.commons.cli.DefaultParser;
27+
import org.apache.commons.cli.Options;
28+
29+
/**
30+
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
31+
* 1. Build and install jars:
32+
* mvn clean install
33+
* 2. cd [repo root]/examples
34+
* 3. Run the subscriber:
35+
* dapr run --resources-path ./components/pubsub --app-id subscriber -- \
36+
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
37+
*/
38+
public class Subscriber {
39+
40+
//The title of the topic to be used for publishing
41+
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
42+
43+
//The name of the pubsub
44+
private static final String PUBSUB_NAME = "messagebus";
45+
46+
/**
47+
* This is the entry point for this example app, which subscribes to a topic.
48+
* @param args Used to optionally pass a topic name.
49+
* @throws Exception An Exception on startup.
50+
*/
51+
public static void main(String[] args) throws Exception {
52+
String topicName = getTopicName(args);
53+
try (var client = new DaprClientBuilder().buildPreviewClient()) {
54+
var subscription = client.subscribeToEvents(
55+
PUBSUB_NAME,
56+
topicName,
57+
new SubscriptionListener<>() {
58+
59+
@Override
60+
public Status onEvent(CloudEvent<String> event) {
61+
System.out.println("Subscriber got: " + event.getData());
62+
return Status.SUCCESS;
63+
}
64+
65+
@Override
66+
public void onError(RuntimeException exception) {
67+
System.out.println("Subscriber got exception: " + exception.getMessage());
68+
}
69+
},
70+
TypeRef.STRING);
71+
72+
subscription.awaitTermination();
73+
}
74+
}
75+
76+
/**
77+
* If a topic is specified in args, use that.
78+
* Else, fallback to the default topic.
79+
* @param args program arguments
80+
* @return name of the topic to publish messages to.
81+
*/
82+
private static String getTopicName(String[] args) {
83+
if (args.length >= 1) {
84+
return args[0];
85+
}
86+
return DEFAULT_TOPIC_NAME;
87+
}
88+
}

examples/src/main/java/io/dapr/examples/workflows/README.md

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ This example contains the follow classes:
99
* DemoWorkflow: An example of a Dapr Workflow.
1010
* DemoWorkflowClient: This application will start workflows using Dapr.
1111
* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance.
12-
12+
1313
## Pre-requisites
1414

1515
* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
1616
* Java JDK 11 (or greater):
17-
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
18-
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
19-
* [OpenJDK 11](https://jdk.java.net/11/)
17+
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
18+
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
19+
* [OpenJDK 11](https://jdk.java.net/11/)
2020
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
2121

2222
### Checking out the code
@@ -54,8 +54,8 @@ Those examples contain the following workflow patterns:
5454
5. [Sub-workflow Pattern](#sub-workflow-pattern)
5555

5656
### Chaining Pattern
57-
In the chaining pattern, a sequence of activities executes in a specific order.
58-
In this pattern, the output of one activity is applied to the input of another activity.
57+
In the chaining pattern, a sequence of activities executes in a specific order.
58+
In this pattern, the output of one activity is applied to the input of another activity.
5959
The chaining pattern is useful when you need to execute a sequence of activities in a specific order.
6060

6161
The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below:
@@ -149,6 +149,7 @@ Execute the following script in order to run DemoChainWorker:
149149
```sh
150150
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
151151
```
152+
152153
Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
153154
```text
154155
== APP == Start workflow runtime
@@ -162,6 +163,8 @@ java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chai
162163
```
163164
<!-- END_STEP -->
164165

166+
167+
165168
Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
166169
```text
167170
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
@@ -237,7 +240,7 @@ public class CountWordsActivity implements WorkflowActivity {
237240
}
238241
```
239242
<!-- STEP
240-
name: Run Fan-in/Fan-out Pattern workflow
243+
name: Run Chaining Pattern workflow
241244
match_order: none
242245
output_match_mode: substring
243246
expected_stdout_lines:
@@ -255,7 +258,9 @@ Execute the following script in order to run DemoFanInOutWorker:
255258
```sh
256259
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
257260
```
261+
258262
Execute the following script in order to run DemoFanInOutClient:
263+
259264
```sh
260265
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
261266
```
@@ -343,7 +348,7 @@ public class CleanUpActivity implements WorkflowActivity {
343348

344349
Once you start the workflow and client using the following commands:
345350
```sh
346-
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
351+
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
347352
```
348353
```sh
349354
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
@@ -406,36 +411,21 @@ public class DemoExternalEventWorkflow extends Workflow {
406411
}
407412
```
408413
409-
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
414+
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
410415
```java
411416
client.raiseEvent(instanceId, "Approval", true);
412417
```
413418
414419
Start the workflow and client using the following commands:
415420
416-
<!-- STEP
417-
name: Run Wait External Event Pattern workflow
418-
match_order: none
419-
output_match_mode: substring
420-
expected_stdout_lines:
421-
- 'Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow'
422-
- 'Waiting for approval...'
423-
- 'approval granted - do the approved action'
424-
- 'Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity'
425-
- 'Running approval activity...'
426-
- 'approval-activity finished'
427-
background: true
428-
sleep: 60
429-
timeout_seconds: 60
430-
-->
421+
ex
431422
```sh
432-
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
423+
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
433424
```
434425
435426
```sh
436427
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
437428
```
438-
<!-- END_STEP -->
439429
440430
The worker logs:
441431
```text
@@ -521,25 +511,14 @@ public class ReverseActivity implements WorkflowActivity {
521511
522512
Start the workflow and client using the following commands:
523513
524-
<!-- STEP
525-
name: Run Sub-workflow Pattern workflow
526-
match_order: none
527-
output_match_mode: substring
528-
expected_stdout_lines:
529-
- 'calling subworkflow with input: Hello Dapr Workflow!'
530-
- 'SubWorkflow finished with: !wolfkroW rpaD olleH'
531-
background: true
532-
sleep: 60
533-
timeout_seconds: 60
534-
-->
514+
ex
535515
```sh
536-
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
516+
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
537517
```
538518
539519
```sh
540520
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
541521
```
542-
<!-- END_STEP -->
543522
544523
The log from worker:
545524
```text

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<grpc.version>1.64.0</grpc.version>
1818
<protobuf.version>3.25.0</protobuf.version>
1919
<protocCommand>protoc</protocCommand>
20-
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto</dapr.proto.baseurl>
20+
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto</dapr.proto.baseurl>
2121
<dapr.sdk.version>1.13.0-SNAPSHOT</dapr.sdk.version>
2222
<dapr.sdk.alpha.version>0.13.0-SNAPSHOT</dapr.sdk.alpha.version>
2323
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>

sdk-autogen/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<description>Auto-generated SDK for Dapr</description>
1818

1919
<properties>
20+
<java.version>17</java.version>
2021
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
2122
<protobuf.input.directory>${project.build.directory}/proto</protobuf.input.directory>
2223
<maven.deploy.skip>false</maven.deploy.skip>

0 commit comments

Comments
 (0)