Skip to content

Commit

Permalink
PIP-120 : Enable client memory limit controller by default (#13344)
Browse files Browse the repository at this point in the history
* Enable client memory limit controller by default

* Deprecate maxPendingMessagesAcrossPartitions
  • Loading branch information
HQebupt authored Jan 22, 2022
1 parent 86fd8e7 commit 11bfc0e
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* the client application. Until, the producer gets a successful acknowledgment back from the broker,
* it will keep in memory (direct memory pool) all the messages in the pending queue.
*
* <p>Default is 1000.
* <p>Default is 0, disable the pending messages check.
*
* @param maxPendingMessages
* the max size of the pending messages queue for the producer
Expand All @@ -188,7 +188,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* The purpose of this setting is to have an upper-limit on the number
* of pending messages when publishing on a partitioned topic.
*
* <p>Default is 50000.
* <p>Default is 0, disable the pending messages across partitions check.
*
* <p>If publishing at high rate over a topic with many partitions (especially when publishing messages without a
* partitioning key), it might be beneficial to increase this parameter to allow for more pipelining within the
Expand All @@ -198,6 +198,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* max pending messages across all the partitions
* @return the producer builder instance
*/
@Deprecated
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
return this;
}

@Deprecated
@Override
public ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ public class ClientConfigurationData implements Serializable, Cloneable {

@ApiModelProperty(
name = "memoryLimitBytes",
value = "Limit of client memory usage (in byte)."
value = "Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput."
)
private long memoryLimitBytes = 0;
private long memoryLimitBytes = 64 * 1024 * 1024;

@ApiModelProperty(
name = "proxyServiceUrl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

public static final int DEFAULT_BATCHING_MAX_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
public static final int DEFAULT_MAX_PENDING_MESSAGES = 0;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 0;

private String topicName = null;
private String producerName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,12 @@ public void testProducerBuilderImplWhenSendTimeoutPropertyIsNegative() {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalid() {
producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
producerBuilderImpl.maxPendingMessagesAcrossPartitions(-1);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages")
public void testProducerBuilderImplWhenMaxPendingMessagesAcrossPartitionsPropertyIsInvalidErrorMessages() {
producerBuilderImpl.maxPendingMessagesAcrossPartitions(999);
producerBuilderImpl.maxPendingMessagesAcrossPartitions(-1);
}

@Test
Expand Down

0 comments on commit 11bfc0e

Please sign in to comment.