Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support consumer client memory limit #991

Merged
merged 9 commits into from
Mar 23, 2023
15 changes: 8 additions & 7 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

const (
defaultConnectionTimeout = 10 * time.Second
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
defaultConnMaxIdleTime = 180 * time.Second
minConnMaxIdleTime = 60 * time.Second
defaultConnectionTimeout = 10 * time.Second
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
defaultMemoryLimitTriggerThreshold = 0.95
defaultConnMaxIdleTime = 180 * time.Second
minConnMaxIdleTime = 60 * time.Second
)

type client struct {
Expand Down Expand Up @@ -158,7 +159,7 @@ func newClient(options ClientOptions) (Client, error) {
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes),
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand Down
28 changes: 26 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ const (
)

const (
initialReceiverQueueSize = 1
initialReceiverQueueSize = 1
receiverQueueExpansionMemThreshold = 0.75
)

const (
Expand Down Expand Up @@ -333,6 +334,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
pc.client.memLimit.RegisterTrigger(pc.shrinkReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
Expand Down Expand Up @@ -1002,6 +1004,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.metrics.MessagesReceived.Add(float64(numMsgs))
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))

var bytesReceived int
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil || payload == nil {
Expand Down Expand Up @@ -1116,9 +1119,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
})

messages = append(messages, msg)
bytesReceived += msg.size()
}

if pc.options.autoReceiverQueueSize {
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}
Expand Down Expand Up @@ -1270,13 +1275,15 @@ func (pc *partitionConsumer) dispatcher() {
var queueCh chan []*message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int

// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}
nextMessageSize = messages[0].size()

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
Expand Down Expand Up @@ -1339,6 +1346,7 @@ func (pc *partitionConsumer) dispatcher() {

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
pc.expectMoreIncomingMessages()
}

Expand Down Expand Up @@ -1742,7 +1750,8 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
if newSize > oldSize {
usagePercent := pc.client.memLimit.CurrentUsagePercent()
if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
Expand All @@ -1760,6 +1769,21 @@ func (pc *partitionConsumer) markScaleIfNeed() {
}
}

func (pc *partitionConsumer) shrinkReceiverQueueSize() {
if !pc.options.autoReceiverQueueSize {
return
}

oldSize := pc.currentQueueSize.Load()
shibd marked this conversation as resolved.
Show resolved Hide resolved
minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize)))
newSize := int32(math.Max(float64(minSize), float64(oldSize/2)))
if newSize < oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression())
if !ok {
Expand Down
201 changes: 201 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4113,3 +4113,204 @@ func TestConsumerBatchIndexAckDisabled(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, []byte("done"), message.Payload())
}

func TestConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer cli1.Close()

// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})

assert.Nil(t, err)
defer cli2.Close()

topic := newTopicName()

// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()

// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]

// Fill up the messageCh of c1
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

// Get current receiver queue size of c1
prevQueueSize := pc1.currentQueueSize.Load()

// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)

// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, prevQueueSize/2, pc1.currentQueueSize.Load())
})

// Use mem-limited client 2 to create consumer c2
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c2.(*consumer).consumers[0]

// Try to induce c2 receiver queue size expansion
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

// c2 receiver queue size should not expansion because client 1 has exceeded the memory limit
assert.Equal(t, 1, int(pc2.currentQueueSize.Load()))

// Use mem-limited client 2 to create producer p2
p2, err := cli2.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
DisableBlockIfQueueFull: true,
})
assert.Nil(t, err)
defer p2.Close()

_, err = p2.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
}

func TestMultiConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer cli1.Close()

// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})

assert.Nil(t, err)
defer cli2.Close()

topic := newTopicName()

// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()

// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]

// Use mem-limited client 2 to create consumer c1
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c1.(*consumer).consumers[0]
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved

// Fill up the messageCh of c1 nad c2
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc2.messageCh))
})

// Get current receiver queue size of c1 and c2
pc1PrevQueueSize := pc1.currentQueueSize.Load()
pc2PrevQueueSize := pc2.currentQueueSize.Load()

// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)

// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc1PrevQueueSize/2, pc1.currentQueueSize.Load())
})

// c2 should shrink it's receiver queue size too
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load())
})
}
4 changes: 4 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ func (msg *message) BrokerPublishTime() *time.Time {
return msg.brokerPublishTime
}

func (msg *message) size() int {
return len(msg.payLoad)
}

func newAckTracker(size uint) *ackTracker {
batchIDs := bitset.New(size)
for i := uint(0); i < size; i++ {
Expand Down
Loading