Skip to content

Commit

Permalink
Adding Java examples (forcedotcom#20)
Browse files Browse the repository at this point in the history
* Adding Java examples

* Updating READMEs

* Adding more examples

* Updating README

* Bug fixes

* Bug fixes, nits
  • Loading branch information
sidd0610 authored and Marek Madejski committed Jan 24, 2023
1 parent 81a3cfb commit 0c44963
Show file tree
Hide file tree
Showing 23 changed files with 2,760 additions and 3 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ $RECYCLE.BIN/
.idea

# Dependency directory for Go examples
go/vendor/
go/vendor/

*/target/
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ officially supported languages have well-supported Avro libraries:
|C++|[Apache Avro C++](https://avro.apache.org/docs/current/api/cpp/html/index.html)|
|Dart|[avro-dart](https://github.com/sqs/avro-dart) (last updated 2012)|
|Go|[goavro](https://github.com/linkedin/goavro)|
|Java|[Apache Avro Java](https://avro.apache.org/docs/1.10.2/gettingstartedjava.html)|
|Java|[Apache Avro Java](https://avro.apache.org/docs/current/getting-started-java/)|
|Kotlin|[avro4k](https://github.com/avro-kotlin/avro4k)|
|Node|[avro-js](https://www.npmjs.com/package/avro-js)|
|Objective C|[ObjectiveAvro](https://github.com/jlawton/ObjectiveAvro) (but read [this](https://stackoverflow.com/questions/57216446/data-serialisation-in-objective-c-avro-alternative))|
|PHP|[avro-php](https://github.com/wikimedia/avro-php)|
|Python|[Apache Avro Python](https://avro.apache.org/docs/current/gettingstartedpython.html)|
|Python|[Apache Avro Python](https://avro.apache.org/docs/current/getting-started-python/)|
|Ruby|[AvroTurf](https://github.com/dasch/avro_turf)|

## Documentation, Blog Post and Videos
Expand All @@ -40,6 +40,7 @@ officially supported languages have well-supported Avro libraries:
- [Python Quick Start in the Developer Guide](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/qs-python-quick-start.html)
- [Python Code Examples](python/)
- [Go Code Examples](go/)
- [Java Code Examples](java/)
- [E-Bikes Sample Application](https://github.com/trailheadapps/ebikes-lwc)
- [Pub/Sub API Node Client](https://github.com/pozil/pub-sub-api-node-client)

62 changes: 62 additions & 0 deletions java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Pub/Sub API Java Examples

## Overview
This directory contains some simple Java Examples that can be used with the Pub/Sub API. These examples range from generic Publish and Subscribe, processing CustomEventHeaders in change events and also a specific example of updating the Salesforce Account standard object. It is important to note that these examples are not performance tested nor are they production ready. They are meant to be used as a learning resource or a starting point to understand the flows of each of the Remote Procedure Calls (RPCs) of Pub/Sub API. There are some limitations to these examples as well mentioned below.

## Project Structure
In the `src/main` directory of the project, you will find several sub-directories as follows:
* `java/`: This directory contains the main source code for all the examples grouped into separate packages:
* `accountupdateapp/`: This package contains the examples for updating an Account standard object with an AccountNumber.
* `genericpubsub/`: This package contains the examples covering the general flows of all RPCs of Pub/Sub API.
* `processchangeeventheader/`: This package contains an example for extracting the changed fields from a bitmap value in a change event.
* `utility`: This package contains a list of utility classes used across all the examples.
* `proto/` - This directory contains the same `pubsub_api.proto` file found at the root of this repo. The plugin used to generate the sources requires for this proto file to be present in the `src` directory.
* `resources/` - This directory contains a list of resources needed for running the examples.

## Running the Examples
### Prerequisites
1. Install [Java 11](https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html), [Maven](https://maven.apache.org/install.html).
2. Clone this project.
3. Run `mvn clean install` from the `java` directory to build the project and generate required sources from the proto file.
4. The `arguments.yaml` file in the `src/main/resources` sub-directory contains a list of required and optional configurations needed to run the examples. The file contains detailed comments on how to set the configurations.
5. Get the username, password, and login URL of the Salesforce org you wish to use.
6. For the examples in `genericpubsub` package, a custom **_CarMaintenance_** [platform event](https://developer.salesforce.com/docs/atlas.en-us.platform_events.meta/platform_events/platform_events_define_ui.htm) has to be created in the Salesforce org. Ensure your CarMaintenance platform event matches the following structure:
- Standard Fields
- Label: `CarMaintenance`
- Plural Label: `CarMaintenances`
- Custom Fields
- `Cost` (Number)
- `Mileage` (Number)
- `WorkDescription` (Text, 200)
7. For the examples in the `accountupdateapp` package, another custom **_NewAccount_** [platform event](https://developer.salesforce.com/docs/atlas.en-us.platform_events.meta/platform_events/platform_events_define_ui.htm) has to be created in the Salesforce org. [More info here](src/main/java/accountupdateapp/README.md).

### Execution
1. Update the configurations in the `src/main/resources/arguments.yaml` file. The required configurations will apply to all the examples and the optional ones depends on which example is being executed. The configurations include:
1. Required configurations:
* `PUBSUB_HOST`: Specify the Pub/Sub API endpoint to be used.
* `PUBSUB_PORT`: Specify the Pub/Sub API port to be used (usually 7443).
* `LOGIN_URL`: Specify the login url of the Salesforce org being used to run the examples.
* `USERNAME` & `PASSWORD`: For authentication via username and password, you will need to specify the username and password of the Salesforce org.
* `ACCESS_TOKEN` & `TENANT_ID`: For authentication via session token an tenant ID, you will need to specify the sessionToken and tenant ID of the Salesforce org.
2. Optional Parameters:
* `TOPIC`: Specify the topic for which you wish to publish/subscribe.
* `NUMBER_OF_EVENTS_TO_PUBLISH`: Specify the number of events to publish while using the PublishStream RPC.
* `NUMBER_OF_EVENTS_TO_SUBSCRIBE`: Specify the number of events to subscribe while using the Subscribe RPC.
* `REPLAY_PRESET`: Specify the ReplayPreset for subscribe examples.
* If a subscription has to be started using the CUSTOM replay preset, the `REPLAY_ID` parameter is mandatory.
* The `REPLAY_ID` has to be specified in the following format: `[<byte_values_separated_by_commas>]`. Please enter the values as is within the square brackets and without any quotes.
* Example: `[0, 1, 2, 3, 4, -5, 6, 7, -8]`

2. After setting up the configurations, any example can be executed using the `./run.sh` file available at the parent directory.
* Format for running the examples: `./run.sh <package_name>.<example_class_name>`
* Example: `./run.sh genericpubsub.PublishStream`

## Implementation
- This repo can be used as a reference point for clients looking to create a Java app to integrate with Pub/Sub API. Note that the project structure and the examples included in this repo are intended for demo purposes only and clients are free to implement their own Java apps in any way they see fit.
- The Subscribe RPC examples demonstrates a basic flow control strategy where a new `fetchRequest` is sent only after the requested number of events in the previous `fetchRequest(s)` are received. Custom flow control strategies can be implemented as needed. More info on flow control available [here](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/flow-control.html).

# Limitations
1. No guarantees that streams will remain open with `PublishStream` examples - Pub/Sub API has idle timeouts and will close idle streams. If a stream is closed while running these examples, you will most likely need to stop and restart.
2. No support for republishing on error - If an error occurs while publishing the relevant examples will surface the error but will not attempt to republish the event.
3. No security guarantees - Teams using these examples for reference will need to do their own security audits to ensure the dependencies introduced here can be safely used.
4. No performance testing - These examples have not been perf tested.
120 changes: 120 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>pubsub-java</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>11</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<avro.version>1.11.0</avro.version>
<grpc.version>1.35.1</grpc.version>
</properties>

<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>10.0.10</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
13 changes: 13 additions & 0 deletions java/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh
#
# A convenience script that runs examples based on locally built JARs. Usage:
# mvn clean package
# ./run.sh <package-name><class-name>
#

EXAMPLE=$1
if [ "x$EXAMPLE" = "x" ]; then
echo "Please specify one of the example class names from the package com.salesforce.eventbusclient.example"
exit -1
fi
java -cp target/pubsub-java-1.0-SNAPSHOT-jar-with-dependencies.jar $EXAMPLE $@
119 changes: 119 additions & 0 deletions java/src/main/java/accountupdateapp/AccountListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package accountupdateapp;

import static accountupdateapp.AccountUpdateAppUtil.*;
import static utility.CommonContext.*;

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.eventbus.protobuf.ConsumerEvent;
import com.salesforce.eventbus.protobuf.FetchResponse;

import genericpubsub.Publish;
import genericpubsub.Subscribe;
import io.grpc.stub.StreamObserver;
import utility.CommonContext;
import utility.ExampleConfigurations;

/**
* AccountListener
* A subscribe client that listens to the Change Data Capture (CDC) events of the Account object
* and publishes events of the `/event/NewAccount__e` custom platform event.
*
* Example:
* ./run.sh accountupdateapp.AccountListener
*
* @author sidd0610
* @since v1.0
*/

public class AccountListener {

protected static final Logger logger = LoggerFactory.getLogger(AccountListener.class.getClass());

protected Subscribe subscriber;
protected Publish publisher;

private static final String SUBSCRIBER_TOPIC = "/data/AccountChangeEvent";
private static final String PUBLISHER_TOPIC = "/event/NewAccount__e";

public AccountListener(ExampleConfigurations requiredParams) {
logger.info("Setting up the Subscriber");
ExampleConfigurations subscriberParams = setupSubscriberParameters(requiredParams, SUBSCRIBER_TOPIC, 100);
this.subscriber = new Subscribe(subscriberParams, getAccountListenerResponseObserver());
logger.info("Setting up the Publisher");
ExampleConfigurations publisherParams = setupPublisherParameters(requiredParams, PUBLISHER_TOPIC);
this.publisher = new Publish(publisherParams);
}

/**
* Custom StreamObserver for the AccountListener.
*
* @return StreamObserver<FetchResponse>
*/
private StreamObserver<FetchResponse> getAccountListenerResponseObserver() {
return new StreamObserver<FetchResponse>() {
@Override
public void onNext(FetchResponse fetchResponse) {
for(ConsumerEvent ce: fetchResponse.getEventsList()) {
try {
Schema writerSchema = subscriber.getSchema(ce.getEvent().getSchemaId());
GenericRecord eventPayload = CommonContext.deserialize(writerSchema, ce.getEvent().getPayload());
subscriber.updateReceivedEvents(1);
for (String recordId : getRecordIdsOfAccountCDCEvent(eventPayload)) {
logger.info("New Account was Created");
publisher.publish(createNewAccountProducerEvent(publisher.getSchema(), publisher.getSchemaInfo(), recordId));
}
} catch (Exception e) {
logger.info(e.toString());
}
}
if (subscriber.getReceivedEvents().get() < subscriber.getTotalEventsRequested()) {
subscriber.fetchMore(subscriber.getBatchSize());
} else {
subscriber.receivedAllEvents.set(true);
}
}

@Override
public void onError(Throwable t) {
printStatusRuntimeException("Error during SubscribeStream", (Exception) t);
subscriber.isActive.set(false);
}

@Override
public void onCompleted() {
logger.info("Received requested number of events! Call completed by server.");
subscriber.isActive.set(false);
}
};
}

// Helper function to start the app.
public void startApp() throws InterruptedException {
subscriber.startSubscription();
subscriber.waitForEvents();
}

// Helper function to stop the app.
public void stopApp() {
subscriber.close();
publisher.close();
}

public static void main(String[] args) throws IOException {
// For this example specifying only the required configurations in the arguments.yaml is enough.
ExampleConfigurations requiredParameters = new ExampleConfigurations("arguments.yaml");
try {
AccountListener ac = new AccountListener(requiredParameters);
ac.startApp();
ac.stopApp();
} catch (Exception e) {
printStatusRuntimeException("Error during AccountListener", e);
}
}
}
Loading

0 comments on commit 0c44963

Please sign in to comment.