Skip to content

Commit

Permalink
Update batch size usage in Subscribe examples (#61)
Browse files Browse the repository at this point in the history
* Update batch size usage in Subscribe examples
  • Loading branch information
sidd0610 authored Aug 2, 2024
1 parent 96ae4c1 commit 45a47a3
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions java/src/main/java/genericpubsub/ManagedSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @author jalaya
*/
public class ManagedSubscribe extends CommonContext implements StreamObserver<ManagedFetchResponse> {
private static int BATCH_SIZE = 5;
private static int BATCH_SIZE;
private StreamObserver<ManagedFetchRequest> serverStream;
private Map<String, Schema> schemaCache = new ConcurrentHashMap<>();
private final CountDownLatch serverOnCompletedLatch = new CountDownLatch(1);
Expand All @@ -45,7 +45,7 @@ public ManagedSubscribe(ExampleConfigurations exampleConfigurations) {
isActive.set(true);
this.managedSubscriptionId = exampleConfigurations.getManagedSubscriptionId();
this.developerName = exampleConfigurations.getDeveloperName();
BATCH_SIZE = Math.min(5, exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest());
this.BATCH_SIZE = exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest();
this.processChangedFields = exampleConfigurations.getProcessChangedFields();
}

Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/genericpubsub/Subscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations) {
super(exampleConfigurations);
isActive.set(true);
this.exampleConfigurations = exampleConfigurations;
this.BATCH_SIZE = Math.min(5, exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest());
this.BATCH_SIZE = exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest();
this.responseStreamObserver = getDefaultResponseStreamObserver();
this.setupTopicDetails(exampleConfigurations.getTopic(), false, false);
this.replayPreset = exampleConfigurations.getReplayPreset();
Expand All @@ -68,7 +68,7 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<Fet
super(exampleConfigurations);
isActive.set(true);
this.exampleConfigurations = exampleConfigurations;
this.BATCH_SIZE = Math.min(5, exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest());
this.BATCH_SIZE = exampleConfigurations.getNumberOfEventsToSubscribeInEachFetchRequest();
this.responseStreamObserver = responseStreamObserver;
this.setupTopicDetails(exampleConfigurations.getTopic(), false, false);
this.replayPreset = exampleConfigurations.getReplayPreset();
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/resources/arguments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ NUMBER_OF_EVENTS_TO_PUBLISH: null
# in different PublishRequests (default: false)
# Used only by PublishStream.java
SINGLE_PUBLISH_REQUEST: null
# Number of Events to subscribe (default: 5)
# Number of events to subscribe to in each FetchRequest/ManagedFetchRequest (default: 5)
NUMBER_OF_EVENTS_IN_FETCHREQUEST: null
# ReplayPreset (Accepted Values: {EARLIEST, LATEST (default), CUSTOM})
REPLAY_PRESET: null
Expand Down

0 comments on commit 45a47a3

Please sign in to comment.