Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java subscribe example should showcase consuming events in order #47

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions java/src/main/java/genericpubsub/PublishStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -55,11 +56,10 @@ public PublishStream(ExampleConfigurations exampleConfigurations) {
public void publishStream(int numEventsToPublish, Boolean singlePublishRequest) throws Exception {
CountDownLatch finishLatch = new CountDownLatch(1);
AtomicReference<CountDownLatch> finishLatchRef = new AtomicReference<>(finishLatch);
final List<Status> errorStatuses = Lists.newArrayList();
final int numExpectedPublishResponses = singlePublishRequest ? 1 : numEventsToPublish;
final List<PublishResponse> publishResponses = Lists.newArrayListWithExpectedSize(numExpectedPublishResponses);
AtomicInteger failed = new AtomicInteger(0);
StreamObserver<PublishResponse> pubObserver = getDefaultPublishStreamObserver(errorStatuses, finishLatchRef,
StreamObserver<PublishResponse> pubObserver = getDefaultPublishStreamObserver(finishLatchRef,
numExpectedPublishResponses, publishResponses, failed);

// construct the stream
Expand All @@ -75,7 +75,7 @@ public void publishStream(int numEventsToPublish, Boolean singlePublishRequest)
requestObserver.onNext(generatePublishRequest(numEventsToPublish, singlePublishRequest));
}

validatePublishResponse(errorStatuses, finishLatch, numExpectedPublishResponses, publishResponses, failed);
validatePublishResponse(finishLatch, numExpectedPublishResponses, publishResponses, failed, numEventsToPublish);
requestObserver.onCompleted();
}

Expand All @@ -89,31 +89,32 @@ public void publishStream(int numEventsToPublish, Boolean singlePublishRequest)
* @return
* @throws Exception
*/
private void validatePublishResponse(List<Status> errorStatus, CountDownLatch finishLatch,
int expectedResponseCount, List<PublishResponse> publishResponses, AtomicInteger failed) throws Exception {
private void validatePublishResponse(CountDownLatch finishLatch,
int expectedResponseCount, List<PublishResponse> publishResponses, AtomicInteger failed, int expectedNumEventsPublished) throws Exception {
String exceptionMsg;
boolean failedPublish = false;
if (!finishLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
failedPublish = true;
exceptionMsg = "[ERROR] publishStream timed out after: " + TIMEOUT_SECONDS + "sec";
logger.error(exceptionMsg);
}

boolean receivedAllResponses = true;
if (expectedResponseCount != publishResponses.size()) {
receivedAllResponses = false;
exceptionMsg = "[ERROR] PublishStream received: " + publishResponses.size() + " events instead of expected "
failedPublish = true;
exceptionMsg = "[ERROR] PublishStream received: " + publishResponses.size() + " PublishResponses instead of expected "
+ expectedResponseCount;
logger.error(exceptionMsg);

errorStatus.stream().forEach(status -> {
logger.error("[ERROR] Unexpected error status: " + status);
});
}

if (failed.get() != 0 || !receivedAllResponses) {
if (failed.get() != 0) {
failedPublish = true;
exceptionMsg = "[ERROR] Failed to publish all events. " + failed + " failed out of "
+ expectedResponseCount;
+ expectedNumEventsPublished;
logger.error(exceptionMsg);
throw new Exception(exceptionMsg);
}

if (failedPublish) {
throw new RuntimeException("Failed to publish events.");
}
}

Expand Down Expand Up @@ -157,8 +158,9 @@ private ProducerEvent[] generateProducerEvents(int count) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(buffer, null);
writer.write(events.get(i), encoder);
// Setting event id which can be used to correlate PublishResponses returned from Pub/Sub API server
prodEvents[i] = ProducerEvent.newBuilder().setSchemaId(schemaInfo.getSchemaId())
.setPayload(ByteString.copyFrom(buffer.toByteArray())).build();
.setPayload(ByteString.copyFrom(buffer.toByteArray())).setId(UUID.randomUUID().toString()).build();
}

return prodEvents;
Expand Down Expand Up @@ -196,8 +198,7 @@ private PublishRequest generatePublishRequest(int count, Boolean singlePublishRe
* @param publishResponses
* @return
*/
private StreamObserver<PublishResponse> getDefaultPublishStreamObserver(List<Status> errorStatus,
AtomicReference<CountDownLatch> finishLatchRef, int expectedResponseCount,
private StreamObserver<PublishResponse> getDefaultPublishStreamObserver(AtomicReference<CountDownLatch> finishLatchRef, int expectedResponseCount,
List<PublishResponse> publishResponses, AtomicInteger failed) {
return new StreamObserver<>() {
@Override
Expand All @@ -209,10 +210,10 @@ public void onNext(PublishResponse publishResponse) {
for (PublishResult publishResult : publishResponse.getResultsList()) {
if (publishResult.hasError()) {
failed.incrementAndGet();
logger.error("[ERROR] Publishing event having correlationKey: " + publishResult.getCorrelationKey() +
logger.error("[ERROR] Publishing event with correlationKey: " + publishResult.getCorrelationKey() +
" failed with error: " + publishResult.getError().getMsg());
} else {
logger.info("Event publish successful with correlationKey: " + publishResult.getCorrelationKey());
logger.info("Event published successful with correlationKey: " + publishResult.getCorrelationKey());
lastPublishedReplayId = publishResult.getReplayId();
}
}
Expand All @@ -223,14 +224,14 @@ public void onNext(PublishResponse publishResponse) {

@Override
public void onError(Throwable t) {
logger.error("[ERROR] Unexpected error status: " + Status.fromThrowable(t));
printStatusRuntimeException("Error during PublishStream", (Exception) t);
errorStatus.add(Status.fromThrowable(t));
finishLatchRef.get().countDown();
}

@Override
public void onCompleted() {
logger.info("Successfully published " + expectedResponseCount + " events at " + busTopicName + " for tenant " + tenantGuid);
logger.info("Successfully published events for topic " + busTopicName + " for tenant " + tenantGuid);
finishLatchRef.get().countDown();
}
};
Expand Down
33 changes: 3 additions & 30 deletions java/src/main/java/genericpubsub/Subscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class Subscribe extends CommonContext {
private final StreamObserver<FetchResponse> responseStreamObserver;
private final ReplayPreset replayPreset;
private final ByteString customReplayId;
private final ExecutorService eventProcessingExecutors;
private final ScheduledExecutorService retryScheduler;
// Replay should be stored in replay store as bytes since replays are opaque.
private volatile ByteString storedReplay;

public Subscribe(ExampleConfigurations exampleConfigurations) {
Expand All @@ -60,7 +60,6 @@ public Subscribe(ExampleConfigurations exampleConfigurations) {
this.replayPreset = exampleConfigurations.getReplayPreset();
this.customReplayId = exampleConfigurations.getReplayId();
this.retryScheduler = Executors.newScheduledThreadPool(1);
this.eventProcessingExecutors = Executors.newFixedThreadPool(3);
}

public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<FetchResponse> responseStreamObserver) {
Expand All @@ -73,7 +72,6 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<Fet
this.replayPreset = exampleConfigurations.getReplayPreset();
this.customReplayId = exampleConfigurations.getReplayId();
this.retryScheduler = Executors.newScheduledThreadPool(1);
this.eventProcessingExecutors = Executors.newFixedThreadPool(3);
}

/**
Expand Down Expand Up @@ -143,10 +141,7 @@ public void onNext(FetchResponse fetchResponse) {
logger.info("RPC ID: " + fetchResponse.getRpcId());
for(ConsumerEvent ce : fetchResponse.getEventsList()) {
try {
// Unless the schema of the event is available in the local schema cache, there is a blocking
// GetSchema call being made to obtain the schema which may block the thread. Therefore, the
// processing of events is done asynchronously.
CompletableFuture.runAsync(new EventProcessor(ce), eventProcessingExecutors);
processEvent(ce);
} catch (Exception e) {
logger.info(e.toString());
}
Expand Down Expand Up @@ -243,33 +238,14 @@ public void run() {
}
}

/**
* A Runnable class that is used to process the events asynchronously using CompletableFuture.
*/
private class EventProcessor implements Runnable {
private ConsumerEvent ce;
public EventProcessor(ConsumerEvent consumerEvent) {
this.ce = consumerEvent;
}

@Override
public void run() {
try {
processEvent(ce);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

/**
* Helper function to process the events received.
*/
private void processEvent(ConsumerEvent ce) throws IOException {
Schema writerSchema = getSchema(ce.getEvent().getSchemaId());
this.storedReplay = ce.getReplayId();
GenericRecord record = deserialize(writerSchema, ce.getEvent().getPayload());
logger.info("Received event with payload: " + record.toString() + " with schema name: " + writerSchema.getName());
logger.info("Received event with payload: " + record.toString() + " with schema name: " + writerSchema.getName() + " with id: " + ce.getEvent().getId());
}

/**
Expand Down Expand Up @@ -328,9 +304,6 @@ public synchronized void close() {
if (retryScheduler != null) {
retryScheduler.shutdown();
}
if (eventProcessingExecutors != null) {
eventProcessingExecutors.shutdown();
}
} catch (Exception e) {
logger.info(e.toString());
}
Expand Down