Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Subscribe and Managed Subscribe examples #60

Merged
merged 10 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ In the `src/main` directory of the project, you will find several sub-directorie
* `NUMBER_OF_EVENTS_TO_PUBLISH`: Specify the number of events to publish while using the PublishStream RPC.
* `SINGLE_PUBLISH_REQUEST`: Specify if you want to publish the events in a single or multiple PublishRequests.
* `NUMBER_OF_EVENTS_IN_FETCHREQUEST`: Specify the number of events that the Subscribe RPC requests from the server in each FetchRequest. The example fetches at most 5 events in each Subscribe request. If you pass in more than 5, it sends multiple Subscribe requests with at most 5 events requested in FetchRequest each. For more information about requesting events, see [Pull Subscription and Flow Control](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/flow-control.html) in the Pub/Sub API documentation.
* `PROCESS_CHANGE_EVENT_HEADER_FIELDS`: Specify whether the Subscribe or ManagedSubscribe client should process the change data capture event bitmap fields in `ChangeEventHeader`. In this sample, only the `changedFields` field is expanded. To expand the `diffFields` and `nulledFields` header fields, modify the sample code. See [Event Deserialization Considerations](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/event-deserialization-considerations.html).
* `REPLAY_PRESET`: Specify the ReplayPreset for subscribe examples.
* If a subscription has to be started using the CUSTOM replay preset, the `REPLAY_ID` parameter is mandatory.
* The `REPLAY_ID` is a byte array and must be specified in this format: `[<byte_values_separated_by_commas>]`. Please enter the values as is within the square brackets and without any quotes.
Expand Down
7 changes: 7 additions & 0 deletions java/src/main/java/genericpubsub/ManagedSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ public class ManagedSubscribe extends CommonContext implements StreamObserver<Ma
private AtomicInteger receivedEvents = new AtomicInteger(0);
private String developerName;
private String managedSubscriptionId;
private final boolean processChangedFields;

public ManagedSubscribe(ExampleConfigurations exampleConfigurations) {
super(exampleConfigurations);
isActive.set(true);
this.managedSubscriptionId = exampleConfigurations.getManagedSubscriptionId();
this.developerName = exampleConfigurations.getDeveloperName();
BATCH_SIZE = Math.min(5, exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest());
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
}

/**
Expand Down Expand Up @@ -98,6 +100,11 @@ private void processEvent(ManagedFetchResponse response) throws IOException {
Schema writerSchema = getSchema(schemaId);
GenericRecord record = deserialize(writerSchema, event.getEvent().getPayload());
logger.info("Received event: {}", record.toString());
if (processChangedFields) {
// This example expands the changedFields bitmap field in ChangeEventHeader.
// To expand the other bitmap fields, i.e., diffFields and nulledFields, replicate or modify this code.
processAndPrintBitmapFields(writerSchema, record, "changedFields");
}
}
logger.info("Processed batch of {} event(s)", response.getEventsList().size());
}
Expand Down
8 changes: 8 additions & 0 deletions java/src/main/java/genericpubsub/Subscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Subscribe extends CommonContext {
private final ScheduledExecutorService retryScheduler;
// Replay should be stored in replay store as bytes since replays are opaque.
private volatile ByteString storedReplay;
private final boolean processChangedFields;

public Subscribe(ExampleConfigurations exampleConfigurations) {
super(exampleConfigurations);
Expand All @@ -60,6 +61,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations) {
this.replayPreset = exampleConfigurations.getReplayPreset();
this.customReplayId = exampleConfigurations.getReplayId();
this.retryScheduler = Executors.newScheduledThreadPool(1);
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
}

public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<FetchResponse> responseStreamObserver) {
Expand All @@ -72,6 +74,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<Fet
this.replayPreset = exampleConfigurations.getReplayPreset();
this.customReplayId = exampleConfigurations.getReplayId();
this.retryScheduler = Executors.newScheduledThreadPool(1);
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
}

/**
Expand Down Expand Up @@ -246,6 +249,11 @@ private void processEvent(ConsumerEvent ce) throws IOException {
this.storedReplay = ce.getReplayId();
GenericRecord record = deserialize(writerSchema, ce.getEvent().getPayload());
logger.info("Received event with payload: " + record.toString() + " with schema name: " + writerSchema.getName());
if (processChangedFields) {
// This example expands the changedFields bitmap field in ChangeEventHeader.
// To expand the other bitmap fields, i.e., diffFields and nulledFields, replicate or modify this code.
processAndPrintBitmapFields(writerSchema, record, "changedFields");
}
}

/**
Expand Down

This file was deleted.

35 changes: 34 additions & 1 deletion java/src/main/java/utility/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
Expand All @@ -20,11 +20,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.CaseFormat;
import com.google.protobuf.ByteString;
import com.salesforce.eventbus.protobuf.*;

import io.grpc.*;

import static utility.EventParser.getFieldListFromBitmap;

/**
* The CommonContext class provides a list of member variables and functions that is used across
* all examples for various purposes like setting up the HttpClient, CallCredentials, stubs for
Expand Down Expand Up @@ -265,6 +268,36 @@ public static GenericRecord deserialize(Schema schema, ByteString payload) throw
return reader.read(null, decoder);
}

/**
* Helper function to process and print bitmap fields
*
* @param schema
* @param record
* @param bitmapField
* @return
*/
public static void processAndPrintBitmapFields(Schema schema, GenericRecord record, String bitmapField) {
String bitmapFieldPascal = CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, bitmapField);
try {
List<String> changedFields = getFieldListFromBitmap(schema,
(GenericData.Record) record.get("ChangeEventHeader"), bitmapField);
if (!changedFields.isEmpty()) {
logger.info("============================");
logger.info(" " + bitmapFieldPascal + " ");
logger.info("============================");
for (String field : changedFields) {
logger.info(field);
}
logger.info("============================\n");
} else {
logger.info("No " + bitmapFieldPascal + " found\n");
}
} catch (Exception e) {
logger.info("Trying to process " + bitmapFieldPascal + " on unsupported events or no " +
bitmapFieldPascal + " found. Error: " + e.getMessage() + "\n");
}
}

/**
* Helper function to setup Subscribe configurations in some examples.
*
Expand Down
22 changes: 18 additions & 4 deletions java/src/main/java/utility/ExampleConfigurations.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ExampleConfigurations {
private Integer numberOfEventsToPublish;
private Boolean singlePublishRequest;
private Integer numberOfEventsToSubscribeInEachFetchRequest;
private Boolean processChangedFields;
private Boolean plaintextChannel;
private Boolean providedLoginUrl;
private ReplayPreset replayPreset;
Expand All @@ -36,7 +37,7 @@ public class ExampleConfigurations {

public ExampleConfigurations() {
this(null, null, null, null, null,
null, null, null, 5, false, 5,
null, null, null, 5, false, 5, false,
false, false, ReplayPreset.LATEST, null, null, null);
}
public ExampleConfigurations(String filename) throws IOException {
Expand All @@ -62,6 +63,8 @@ public ExampleConfigurations(String filename) throws IOException {
false : Boolean.parseBoolean(obj.get("SINGLE_PUBLISH_REQUEST").toString());
this.numberOfEventsToSubscribeInEachFetchRequest = obj.get("NUMBER_OF_EVENTS_IN_FETCHREQUEST") == null ?
5 : Integer.parseInt(obj.get("NUMBER_OF_EVENTS_IN_FETCHREQUEST").toString());
this.processChangedFields = obj.get("PROCESS_CHANGE_EVENT_HEADER_FIELDS") == null ?
false : Boolean.parseBoolean(obj.get("PROCESS_CHANGE_EVENT_HEADER_FIELDS").toString());
this.plaintextChannel = obj.get("USE_PLAINTEXT_CHANNEL") != null && Boolean.parseBoolean(obj.get("USE_PLAINTEXT_CHANNEL").toString());
this.providedLoginUrl = obj.get("USE_PROVIDED_LOGIN_URL") != null && Boolean.parseBoolean(obj.get("USE_PROVIDED_LOGIN_URL").toString());

Expand All @@ -85,12 +88,13 @@ public ExampleConfigurations(String filename) throws IOException {
public ExampleConfigurations(String username, String password, String loginUrl,
String pubsubHost, int pubsubPort, String topic) {
this(username, password, loginUrl, null, null, pubsubHost, pubsubPort, topic,
5, false, Integer.MAX_VALUE, false, false, ReplayPreset.LATEST, null, null, null);
5, false, Integer.MAX_VALUE, false, false, false, ReplayPreset.LATEST, null, null, null);
}

public ExampleConfigurations(String username, String password, String loginUrl, String tenantId, String accessToken,
String pubsubHost, Integer pubsubPort, String topic, Integer numberOfEventsToPublish,Boolean singlePublishRequest,
Integer numberOfEventsToSubscribeInEachFetchRequest, Boolean plaintextChannel, Boolean providedLoginUrl,
String pubsubHost, Integer pubsubPort, String topic, Integer numberOfEventsToPublish,
Boolean singlePublishRequest, Integer numberOfEventsToSubscribeInEachFetchRequest,
Boolean processChangedFields, Boolean plaintextChannel, Boolean providedLoginUrl,
ReplayPreset replayPreset, ByteString replayId, String devName, String managedSubId) {
this.username = username;
this.password = password;
Expand All @@ -100,8 +104,10 @@ public ExampleConfigurations(String username, String password, String loginUrl,
this.pubsubHost = pubsubHost;
this.pubsubPort = pubsubPort;
this.topic = topic;
this.singlePublishRequest = singlePublishRequest;
this.numberOfEventsToPublish = numberOfEventsToPublish;
this.numberOfEventsToSubscribeInEachFetchRequest = numberOfEventsToSubscribeInEachFetchRequest;
this.processChangedFields = processChangedFields;
this.plaintextChannel = plaintextChannel;
this.providedLoginUrl = providedLoginUrl;
this.replayPreset = replayPreset;
Expand Down Expand Up @@ -190,6 +196,14 @@ public void setNumberOfEventsToSubscribeInEachFetchRequest(int numberOfEventsToS
this.numberOfEventsToSubscribeInEachFetchRequest = numberOfEventsToSubscribeInEachFetchRequest;
}

public Boolean getProcessChangedFields() {
return processChangedFields;
}

public void setProcessChangedFields(Boolean processChangedFields) {
this.processChangedFields = processChangedFields;
}

public boolean usePlaintextChannel() {
return plaintextChannel;
}
Expand Down
4 changes: 3 additions & 1 deletion java/src/main/resources/arguments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TENANT_ID: null
ACCESS_TOKEN: null

# =========================
# Optional configurations:
# Optional Configurations:
# =========================
# Topic to publish/subscribe to (default: /event/Order_Event__e)
TOPIC: null
Expand All @@ -43,6 +43,8 @@ NUMBER_OF_EVENTS_IN_FETCHREQUEST: null
REPLAY_PRESET: null
# Replay ID in ByteString
REPLAY_ID: null
# Flag to enable/disable bitmap field processing in Subscribe and ManagedSubscribe examples for CDC events (default: false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit:
# Flag to enable/disable bitmap ChangeEventHeader field processing in Subscribe and ManagedSubscribe examples for change data capture events (default: false)

PROCESS_CHANGE_EVENT_HEADER_FIELDS: null

# ManagedSubscribe RPC parameters
# For ManagedSubscribe.java, either supply the developer name or the ID of ManagedEventSubscription
Expand Down