diff --git a/salesforce-lib/src/main/java/com/streamsets/pipeline/lib/salesforce/ForceSourceConfigBean.java b/salesforce-lib/src/main/java/com/streamsets/pipeline/lib/salesforce/ForceSourceConfigBean.java index a9af96c13e5..d7ef54d8220 100644 --- a/salesforce-lib/src/main/java/com/streamsets/pipeline/lib/salesforce/ForceSourceConfigBean.java +++ b/salesforce-lib/src/main/java/com/streamsets/pipeline/lib/salesforce/ForceSourceConfigBean.java @@ -267,6 +267,20 @@ public class ForceSourceConfigBean extends ForceInputConfigBean { @ValueChooserModel(ReplayOptionChooserValues.class) public ReplayOption replayOption = ReplayOption.NEW_EVENTS; + @ConfigDef( + required = true, + type = ConfigDef.Type.NUMBER, + label = "Streaming Buffer Size", + description = "Streaming buffer size, in bytes. Increase this if you see 'buffering capacity exceeded' errors.", + defaultValue = "1048576", + displayPosition = 128, + dependencies = { + @Dependency(configName = "subscribeToStreaming", triggeredByValues = "true"), + }, + group = "SUBSCRIBE" + ) + public long streamingBufferSize; + @ConfigDef( required = false, type = ConfigDef.Type.BOOLEAN, diff --git a/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceSource.java b/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceSource.java index f39587b320d..d14b6369692 100644 --- a/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceSource.java +++ b/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceSource.java @@ -837,6 +837,9 @@ private void processMetaMessage(Message message, String nextSourceOffset) throws } } else if (!message.isSuccessful()) { String error = (String) message.get("error"); + if (error == null) { + error = message.get("failure").toString(); + } LOG.info("Bayeux error message: {}", error); if (AUTHENTICATION_INVALID.equals(error)) { diff --git a/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceStreamConsumer.java b/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceStreamConsumer.java index ee13b1487f7..27f70de91ad 100644 --- a/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceStreamConsumer.java +++ b/salesforce-lib/src/main/java/com/streamsets/pipeline/stage/origin/salesforce/ForceStreamConsumer.java @@ -243,6 +243,7 @@ private BayeuxClient makeClient() throws Exception { Map options = new HashMap<>(); options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, READ_TIMEOUT); + options.put(LongPollingTransport.MAX_BUFFER_SIZE_OPTION, conf.streamingBufferSize); LongPollingTransport transport = new LongPollingTransport(options, httpClient) { @Override