Skip to content

Commit

Permalink
Adding Long Lived Subscribe and Updating Publish Examples (#33)
Browse files Browse the repository at this point in the history
* Adding Long Lived Subscribe and Updating Publish Examples

* Adding retry logic

* Updates to Subscribe and refactoring

* Updating Account Update App README and extra logging
  • Loading branch information
sidd0610 authored Apr 19, 2023
1 parent 47ed9ec commit 5eada80
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 106 deletions.
5 changes: 1 addition & 4 deletions java/src/main/java/accountupdateapp/AccountListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ public void onNext(FetchResponse fetchResponse) {
logger.info(e.toString());
}
}
if (subscriber.getReceivedEvents().get() < subscriber.getTotalEventsRequested()) {
if (fetchResponse.getPendingNumRequested() == 0) {
subscriber.fetchMore(subscriber.getBatchSize());
} else {
subscriber.receivedAllEvents.set(true);
}
}

Expand All @@ -96,7 +94,6 @@ public void onCompleted() {
// Helper function to start the app.
public void startApp() throws InterruptedException {
subscriber.startSubscription();
subscriber.waitForEvents();
}

// Helper function to stop the app.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static void updateAccountRecord(ExampleConfigurations subParams, String a

if (res > 299) {
logger.info("Unable to update Account Record.");
logger.info(response.getContentAsString());
} else {
logger.info("Successfully updated Account Record. Updated AccountNumber: " + accountNumber);
}
Expand Down
5 changes: 1 addition & 4 deletions java/src/main/java/accountupdateapp/AccountUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ public void onNext(FetchResponse fetchResponse) {
logger.info(e.toString());
}
}
if (subscriber.getReceivedEvents().get() < subscriber.getTotalEventsRequested()) {
if (fetchResponse.getPendingNumRequested() == 0) {
subscriber.fetchMore(subscriber.getBatchSize());
} else {
subscriber.receivedAllEvents.set(true);
}
}

Expand All @@ -89,7 +87,6 @@ public void onCompleted() {
// Helper function to start the app.
public void startApp() throws InterruptedException {
subscriber.startSubscription();
subscriber.waitForEvents();
}

public void stopApp() {
Expand Down
10 changes: 8 additions & 2 deletions java/src/main/java/accountupdateapp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This example subscribes to change events corresponding to the creation of [Accou
* Search for the `Account` object and click on the right arrow in the middle of the screen to select the entity.
* Click on the `Save` button to update the changes.
2. The `NewAccount` custom platform needs to be created with the following fields:
- Standard Fields
- Platform Event Name
- Label: `NewAccount`
- Plural Label: `NewAccounts`
- Custom Fields
Expand All @@ -35,5 +35,11 @@ This example subscribes to change events corresponding to the creation of [Accou
```

## Notes:
* Subscribers in both the `AccountUpdater` and `AccountListener` subscribe with the ReplayPreset set to LATEST. Therefore, only events generated once the examples have started running will be processed
* Please use the `my domain` URL for your org for running these examples. You can find the my domain URL through Developer Console.
* Open Developer Console
* Click on the Debug menu and select Open Execute Anonymous Window.
* Key in the following in the window: `System.debug(System.url.getOrgDomainUrl());` and execute the same.
* Once done, in the Logs tab below open the logs recently executed code.
* In the logs, get the `my domain` URL from the USER_DEBUG event.
* Subscribers in both the `AccountUpdater` and `AccountListener` subscribe with the ReplayPreset set to LATEST. Therefore, only events generated once the examples have started running will be processed.
* The `AccountUpdater` logs the `AccountNumber` that has been added to the `Account` record which can be used to verify if the update is correct.
3 changes: 1 addition & 2 deletions java/src/main/java/genericpubsub/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public PublishResponse publish(ProducerEvent event) throws Exception {
* @return
*/
private ByteString validatePublishResponse(PublishResponse response) {
final long LATEST = -1;
ByteString lastPublishedReplayId = getReplayIdFromLong(LATEST);
ByteString lastPublishedReplayId = null;
List<PublishResult> resultList = response.getResultsList();
if (resultList.size() != 1) {
String errorMsg = "[ERROR] Error during Publish, received: " + resultList.size() + " events instead of expected 1";
Expand Down
14 changes: 6 additions & 8 deletions java/src/main/java/genericpubsub/PublishStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class PublishStream extends CommonContext {

ClientCallStreamObserver<PublishRequest> requestObserver = null;

private ByteString lastPublishedReplayId;

public PublishStream(ExampleConfigurations exampleConfigurations) {
super(exampleConfigurations);
setupTopicDetails(exampleConfigurations.getTopic(), true, true);
Expand All @@ -50,7 +52,7 @@ public PublishStream(ExampleConfigurations exampleConfigurations) {
* @return ByteString
* @throws Exception
*/
public ByteString publishStream(int numEventsToPublish) throws Exception {
public void publishStream(int numEventsToPublish) throws Exception {
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicReference<CountDownLatch> finishLatchRef = new AtomicReference<>(finishLatch);
final List<Status> errorStatuses = Lists.newArrayList();
Expand All @@ -66,10 +68,9 @@ public ByteString publishStream(int numEventsToPublish) throws Exception {
requestObserver.onNext(generatePublishRequest(i));
}

ByteString lastPublishedReplayId = validatePublishResponse(errorStatuses, finishLatch, numEventsToPublish,
validatePublishResponse(errorStatuses, finishLatch, numEventsToPublish,
publishResponses, failed);
requestObserver.onCompleted();
return lastPublishedReplayId;
}

/**
Expand All @@ -82,11 +83,9 @@ public ByteString publishStream(int numEventsToPublish) throws Exception {
* @return
* @throws Exception
*/
private ByteString validatePublishResponse(List<Status> errorStatus, CountDownLatch finishLatch,
private void validatePublishResponse(List<Status> errorStatus, CountDownLatch finishLatch,
int expectedResponseCount, List<PublishResponse> publishResponses, AtomicInteger failed) throws Exception {
String exceptionMsg;
final long LATEST = -1;
ByteString lastPublishedReplayId = getReplayIdFromLong(LATEST);
if (!finishLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
exceptionMsg = "[ERROR] publishStream timed out after: " + TIMEOUT_SECONDS + "sec";
logger.error(exceptionMsg);
Expand All @@ -110,8 +109,6 @@ private ByteString validatePublishResponse(List<Status> errorStatus, CountDownLa
logger.error(exceptionMsg);
throw new Exception(exceptionMsg);
}

return lastPublishedReplayId;
}

/**
Expand Down Expand Up @@ -173,6 +170,7 @@ public void onNext(PublishResponse publishResponse) {
" failed with error: " + publishResult.getError().getMsg());
} else {
logger.info("Event publish successful with correlationKey: " + publishResult.getCorrelationKey());
lastPublishedReplayId = publishResult.getReplayId();
}
}

Expand Down
Loading

0 comments on commit 5eada80

Please sign in to comment.