Skip to content

Commit

Permalink
SDC-12771. Salesforce Origin - Buffer Limit Exceeded
Browse files Browse the repository at this point in the history
Add new 'Streaming Buffer Size' configuration to set cometd buffer size.

Change-Id: I9dd92fcedfcdef499673332cb171f4f1daf71121
Reviewed-on: https://review.streamsets.net/c/datacollector/+/27215
Reviewed-by: Xavier Baqués <xavi@streamsets.com>
  • Loading branch information
Pat Patterson committed Nov 4, 2019
1 parent 1df427b commit e71adbf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ public class ForceSourceConfigBean extends ForceInputConfigBean {
@ValueChooserModel(ReplayOptionChooserValues.class)
public ReplayOption replayOption = ReplayOption.NEW_EVENTS;

@ConfigDef(
required = true,
type = ConfigDef.Type.NUMBER,
label = "Streaming Buffer Size",
description = "Streaming buffer size, in bytes. Increase this if you see 'buffering capacity exceeded' errors.",
defaultValue = "1048576",
displayPosition = 128,
dependencies = {
@Dependency(configName = "subscribeToStreaming", triggeredByValues = "true"),
},
group = "SUBSCRIBE"
)
public long streamingBufferSize;

@ConfigDef(
required = false,
type = ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ private void processMetaMessage(Message message, String nextSourceOffset) throws
}
} else if (!message.isSuccessful()) {
String error = (String) message.get("error");
if (error == null) {
error = message.get("failure").toString();
}
LOG.info("Bayeux error message: {}", error);

if (AUTHENTICATION_INVALID.equals(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ private BayeuxClient makeClient() throws Exception {

Map<String, Object> options = new HashMap<>();
options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, READ_TIMEOUT);
options.put(LongPollingTransport.MAX_BUFFER_SIZE_OPTION, conf.streamingBufferSize);
LongPollingTransport transport = new LongPollingTransport(options, httpClient) {

@Override
Expand Down

0 comments on commit e71adbf

Please sign in to comment.