Skip to content

Commit 96ae4c1

Browse files
authored
Update Subscribe and Managed Subscribe examples (#60)
* Add processChangedFields argument * Update ManagedSubscribe, README * Remove ProcessChangeEventHeader example * Update arguments.yaml * Addressing Comments 1
1 parent 65c91a2 commit 96ae4c1

File tree

7 files changed

+72
-116
lines changed

7 files changed

+72
-116
lines changed

java/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ In the `src/main` directory of the project, you will find several sub-directorie
4747
* `NUMBER_OF_EVENTS_TO_PUBLISH`: Specify the number of events to publish while using the PublishStream RPC.
4848
* `SINGLE_PUBLISH_REQUEST`: Specify if you want to publish the events in a single or multiple PublishRequests.
4949
* `NUMBER_OF_EVENTS_IN_FETCHREQUEST`: Specify the number of events that the Subscribe RPC requests from the server in each FetchRequest. The example fetches at most 5 events in each Subscribe request. If you pass in more than 5, it sends multiple Subscribe requests with at most 5 events requested in FetchRequest each. For more information about requesting events, see [Pull Subscription and Flow Control](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/flow-control.html) in the Pub/Sub API documentation.
50+
* `PROCESS_CHANGE_EVENT_HEADER_FIELDS`: Specify whether the Subscribe or ManagedSubscribe client should process the change data capture event bitmap fields in `ChangeEventHeader`. In this sample, only the `changedFields` field is expanded. To expand the `diffFields` and `nulledFields` header fields, modify the sample code. See [Event Deserialization Considerations](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/event-deserialization-considerations.html).
5051
* `REPLAY_PRESET`: Specify the ReplayPreset for subscribe examples.
5152
* If a subscription has to be started using the CUSTOM replay preset, the `REPLAY_ID` parameter is mandatory.
5253
* The `REPLAY_ID` is a byte array and must be specified in this format: `[<byte_values_separated_by_commas>]`. Please enter the values as is within the square brackets and without any quotes.

java/src/main/java/genericpubsub/ManagedSubscribe.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ public class ManagedSubscribe extends CommonContext implements StreamObserver<Ma
3838
private AtomicInteger receivedEvents = new AtomicInteger(0);
3939
private String developerName;
4040
private String managedSubscriptionId;
41+
private final boolean processChangedFields;
4142

4243
public ManagedSubscribe(ExampleConfigurations exampleConfigurations) {
4344
super(exampleConfigurations);
4445
isActive.set(true);
4546
this.managedSubscriptionId = exampleConfigurations.getManagedSubscriptionId();
4647
this.developerName = exampleConfigurations.getDeveloperName();
4748
BATCH_SIZE = Math.min(5, exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest());
49+
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
4850
}
4951

5052
/**
@@ -98,6 +100,11 @@ private void processEvent(ManagedFetchResponse response) throws IOException {
98100
Schema writerSchema = getSchema(schemaId);
99101
GenericRecord record = deserialize(writerSchema, event.getEvent().getPayload());
100102
logger.info("Received event: {}", record.toString());
103+
if (processChangedFields) {
104+
// This example expands the changedFields bitmap field in ChangeEventHeader.
105+
// To expand the other bitmap fields, i.e., diffFields and nulledFields, replicate or modify this code.
106+
processAndPrintBitmapFields(writerSchema, record, "changedFields");
107+
}
101108
}
102109
logger.info("Processed batch of {} event(s)", response.getEventsList().size());
103110
}

java/src/main/java/genericpubsub/Subscribe.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class Subscribe extends CommonContext {
4949
private final ScheduledExecutorService retryScheduler;
5050
// Replay should be stored in replay store as bytes since replays are opaque.
5151
private volatile ByteString storedReplay;
52+
private final boolean processChangedFields;
5253

5354
public Subscribe(ExampleConfigurations exampleConfigurations) {
5455
super(exampleConfigurations);
@@ -60,6 +61,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations) {
6061
this.replayPreset = exampleConfigurations.getReplayPreset();
6162
this.customReplayId = exampleConfigurations.getReplayId();
6263
this.retryScheduler = Executors.newScheduledThreadPool(1);
64+
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
6365
}
6466

6567
public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<FetchResponse> responseStreamObserver) {
@@ -72,6 +74,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<Fet
7274
this.replayPreset = exampleConfigurations.getReplayPreset();
7375
this.customReplayId = exampleConfigurations.getReplayId();
7476
this.retryScheduler = Executors.newScheduledThreadPool(1);
77+
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
7578
}
7679

7780
/**
@@ -246,6 +249,11 @@ private void processEvent(ConsumerEvent ce) throws IOException {
246249
this.storedReplay = ce.getReplayId();
247250
GenericRecord record = deserialize(writerSchema, ce.getEvent().getPayload());
248251
logger.info("Received event with payload: " + record.toString() + " with schema name: " + writerSchema.getName());
252+
if (processChangedFields) {
253+
// This example expands the changedFields bitmap field in ChangeEventHeader.
254+
// To expand the other bitmap fields, i.e., diffFields and nulledFields, replicate or modify this code.
255+
processAndPrintBitmapFields(writerSchema, record, "changedFields");
256+
}
249257
}
250258

251259
/**

java/src/main/java/processchangeeventheader/ProcessChangeEventHeader.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

java/src/main/java/utility/CommonContext.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import java.io.*;
44
import java.nio.ByteBuffer;
55
import java.util.ArrayList;
6-
import java.util.Arrays;
76
import java.util.List;
87
import java.util.Map;
98
import java.util.concurrent.TimeUnit;
109

1110
import org.apache.avro.Schema;
11+
import org.apache.avro.generic.GenericData;
1212
import org.apache.avro.generic.GenericDatumReader;
1313
import org.apache.avro.generic.GenericRecord;
1414
import org.apache.avro.generic.GenericRecordBuilder;
@@ -20,11 +20,14 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23+
import com.google.common.base.CaseFormat;
2324
import com.google.protobuf.ByteString;
2425
import com.salesforce.eventbus.protobuf.*;
2526

2627
import io.grpc.*;
2728

29+
import static utility.EventParser.getFieldListFromBitmap;
30+
2831
/**
2932
* The CommonContext class provides a list of member variables and functions that is used across
3033
* all examples for various purposes like setting up the HttpClient, CallCredentials, stubs for
@@ -265,6 +268,36 @@ public static GenericRecord deserialize(Schema schema, ByteString payload) throw
265268
return reader.read(null, decoder);
266269
}
267270

271+
/**
272+
* Helper function to process and print bitmap fields
273+
*
274+
* @param schema
275+
* @param record
276+
* @param bitmapField
277+
* @return
278+
*/
279+
public static void processAndPrintBitmapFields(Schema schema, GenericRecord record, String bitmapField) {
280+
String bitmapFieldPascal = CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, bitmapField);
281+
try {
282+
List<String> changedFields = getFieldListFromBitmap(schema,
283+
(GenericData.Record) record.get("ChangeEventHeader"), bitmapField);
284+
if (!changedFields.isEmpty()) {
285+
logger.info("============================");
286+
logger.info(" " + bitmapFieldPascal + " ");
287+
logger.info("============================");
288+
for (String field : changedFields) {
289+
logger.info(field);
290+
}
291+
logger.info("============================\n");
292+
} else {
293+
logger.info("No " + bitmapFieldPascal + " found\n");
294+
}
295+
} catch (Exception e) {
296+
logger.info("Trying to process " + bitmapFieldPascal + " on unsupported events or no " +
297+
bitmapFieldPascal + " found. Error: " + e.getMessage() + "\n");
298+
}
299+
}
300+
268301
/**
269302
* Helper function to setup Subscribe configurations in some examples.
270303
*

java/src/main/java/utility/ExampleConfigurations.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ExampleConfigurations {
2727
private Integer numberOfEventsToPublish;
2828
private Boolean singlePublishRequest;
2929
private Integer numberOfEventsToSubscribeInEachFetchRequest;
30+
private Boolean processChangedFields;
3031
private Boolean plaintextChannel;
3132
private Boolean providedLoginUrl;
3233
private ReplayPreset replayPreset;
@@ -36,7 +37,7 @@ public class ExampleConfigurations {
3637

3738
public ExampleConfigurations() {
3839
this(null, null, null, null, null,
39-
null, null, null, 5, false, 5,
40+
null, null, null, 5, false, 5, false,
4041
false, false, ReplayPreset.LATEST, null, null, null);
4142
}
4243
public ExampleConfigurations(String filename) throws IOException {
@@ -62,6 +63,8 @@ public ExampleConfigurations(String filename) throws IOException {
6263
false : Boolean.parseBoolean(obj.get("SINGLE_PUBLISH_REQUEST").toString());
6364
this.numberOfEventsToSubscribeInEachFetchRequest = obj.get("NUMBER_OF_EVENTS_IN_FETCHREQUEST") == null ?
6465
5 : Integer.parseInt(obj.get("NUMBER_OF_EVENTS_IN_FETCHREQUEST").toString());
66+
this.processChangedFields = obj.get("PROCESS_CHANGE_EVENT_HEADER_FIELDS") == null ?
67+
false : Boolean.parseBoolean(obj.get("PROCESS_CHANGE_EVENT_HEADER_FIELDS").toString());
6568
this.plaintextChannel = obj.get("USE_PLAINTEXT_CHANNEL") != null && Boolean.parseBoolean(obj.get("USE_PLAINTEXT_CHANNEL").toString());
6669
this.providedLoginUrl = obj.get("USE_PROVIDED_LOGIN_URL") != null && Boolean.parseBoolean(obj.get("USE_PROVIDED_LOGIN_URL").toString());
6770

@@ -85,12 +88,13 @@ public ExampleConfigurations(String filename) throws IOException {
8588
public ExampleConfigurations(String username, String password, String loginUrl,
8689
String pubsubHost, int pubsubPort, String topic) {
8790
this(username, password, loginUrl, null, null, pubsubHost, pubsubPort, topic,
88-
5, false, Integer.MAX_VALUE, false, false, ReplayPreset.LATEST, null, null, null);
91+
5, false, Integer.MAX_VALUE, false, false, false, ReplayPreset.LATEST, null, null, null);
8992
}
9093

9194
public ExampleConfigurations(String username, String password, String loginUrl, String tenantId, String accessToken,
92-
String pubsubHost, Integer pubsubPort, String topic, Integer numberOfEventsToPublish,Boolean singlePublishRequest,
93-
Integer numberOfEventsToSubscribeInEachFetchRequest, Boolean plaintextChannel, Boolean providedLoginUrl,
95+
String pubsubHost, Integer pubsubPort, String topic, Integer numberOfEventsToPublish,
96+
Boolean singlePublishRequest, Integer numberOfEventsToSubscribeInEachFetchRequest,
97+
Boolean processChangedFields, Boolean plaintextChannel, Boolean providedLoginUrl,
9498
ReplayPreset replayPreset, ByteString replayId, String devName, String managedSubId) {
9599
this.username = username;
96100
this.password = password;
@@ -100,8 +104,10 @@ public ExampleConfigurations(String username, String password, String loginUrl,
100104
this.pubsubHost = pubsubHost;
101105
this.pubsubPort = pubsubPort;
102106
this.topic = topic;
107+
this.singlePublishRequest = singlePublishRequest;
103108
this.numberOfEventsToPublish = numberOfEventsToPublish;
104109
this.numberOfEventsToSubscribeInEachFetchRequest = numberOfEventsToSubscribeInEachFetchRequest;
110+
this.processChangedFields = processChangedFields;
105111
this.plaintextChannel = plaintextChannel;
106112
this.providedLoginUrl = providedLoginUrl;
107113
this.replayPreset = replayPreset;
@@ -190,6 +196,14 @@ public void setNumberOfEventsToSubscribeInEachFetchRequest(int numberOfEventsToS
190196
this.numberOfEventsToSubscribeInEachFetchRequest = numberOfEventsToSubscribeInEachFetchRequest;
191197
}
192198

199+
public Boolean getProcessChangedFields() {
200+
return processChangedFields;
201+
}
202+
203+
public void setProcessChangedFields(Boolean processChangedFields) {
204+
this.processChangedFields = processChangedFields;
205+
}
206+
193207
public boolean usePlaintextChannel() {
194208
return plaintextChannel;
195209
}

java/src/main/resources/arguments.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ TENANT_ID: null
2626
ACCESS_TOKEN: null
2727

2828
# =========================
29-
# Optional configurations:
29+
# Optional Configurations:
3030
# =========================
3131
# Topic to publish/subscribe to (default: /event/Order_Event__e)
3232
TOPIC: null
@@ -43,6 +43,9 @@ NUMBER_OF_EVENTS_IN_FETCHREQUEST: null
4343
REPLAY_PRESET: null
4444
# Replay ID in ByteString
4545
REPLAY_ID: null
46+
# Flag to enable/disable processing of bitmap fields in ChangeEventHeader in Subscribe and
47+
# ManagedSubscribe examples for change data capture events (default: false)
48+
PROCESS_CHANGE_EVENT_HEADER_FIELDS: null
4649

4750
# ManagedSubscribe RPC parameters
4851
# For ManagedSubscribe.java, either supply the developer name or the ID of ManagedEventSubscription

0 commit comments

Comments
 (0)