Skip to content

Commit

Permalink
fix: shrinking triggered by memlimit
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleiphir2769 committed Mar 22, 2023
1 parent fa055d7 commit bec8883
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 30 deletions.
38 changes: 11 additions & 27 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ type partitionConsumer struct {

currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
receiverQueueShrinking uAtomic.Bool
incomingMessages uAtomic.Int32

eventsCh chan interface{}
Expand Down Expand Up @@ -336,6 +335,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
pc.client.memLimit.RegisterTrigger(receiverQueueShrinkMemThreshold, pc.shrinkReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
Expand Down Expand Up @@ -1124,7 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}

if pc.options.autoReceiverQueueSize {
pc.reserveMemory(int64(bytesReceived))
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func (pc *partitionConsumer) dispatcher() {

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.releaseMemory(int64(nextMessageSize))
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
pc.expectMoreIncomingMessages()
}

Expand Down Expand Up @@ -1770,34 +1770,18 @@ func (pc *partitionConsumer) markScaleIfNeed() {
}
}

func (pc *partitionConsumer) reserveMemory(size int64) {
pc.client.memLimit.ForceReserveMemory(size)
if pc.client.memLimit.CurrentUsagePercent() >= receiverQueueShrinkMemThreshold {
pc.shrinkReceiverQueueSize()
} else {
// Reset shrinking so that we can shrink receiver queue again
// when memLimit exceed receiverQueueShrinkMemThreshold
pc.receiverQueueShrinking.Store(false)
}
}

func (pc *partitionConsumer) releaseMemory(size int64) {
pc.client.memLimit.ReleaseMemory(size)
}

func (pc *partitionConsumer) shrinkReceiverQueueSize() {
if !pc.options.autoReceiverQueueSize {
return
}
if pc.receiverQueueShrinking.CAS(false, true) {
oldSize := pc.currentQueueSize.Load()
minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize)))
newSize := int32(math.Max(float64(minSize), float64(oldSize/2)))
if newSize < oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}

oldSize := pc.currentQueueSize.Load()
minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize)))
newSize := int32(math.Max(float64(minSize), float64(oldSize/2)))
if newSize < oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}

Expand Down
35 changes: 35 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4224,4 +4224,39 @@ func TestConsumerMemoryLimit(t *testing.T) {
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))

// clear messages in messageCh
timer := time.NewTimer(time.Second)
var stop bool
for !stop {
select {
case <-c1.Chan():
case <-c2.Chan():
case <-timer.C:
stop = true
}
}

pc1.currentQueueSize.Store(8)
pc2.currentQueueSize.Store(8)

// Get current receiver queue size of c1 and c2
pc1PrevQueueSize := pc1.currentQueueSize.Load()
pc2PrevQueueSize := pc2.currentQueueSize.Load()

// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)

// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc1PrevQueueSize/2, pc1.currentQueueSize.Load())
})

// c2 should shrink it's receiver queue size too
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load())
})
}
61 changes: 58 additions & 3 deletions pulsar/internal/memory_limit_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,40 @@ type MemoryLimitController interface {
CurrentUsage() int64
CurrentUsagePercent() float64
IsMemoryLimited() bool
RegisterTrigger(threshold float64, trigger func())
}

type memoryLimitController struct {
limit int64
chCond *chCond
currentUsage int64

triggers []*thresholdTrigger
minThreshold float64
}

type thresholdTrigger struct {
threshold float64
triggerFunc func()
triggerRunning int32
}

func (t *thresholdTrigger) canTryRunning() bool {
return atomic.CompareAndSwapInt32(&t.triggerRunning, 0, 1)
}

func (t *thresholdTrigger) setRunning(isRunning bool) {
if isRunning {
atomic.StoreInt32(&t.triggerRunning, 1)
}
atomic.StoreInt32(&t.triggerRunning, 0)
}

func NewMemoryLimitController(limit int64) MemoryLimitController {
mlc := &memoryLimitController{
limit: limit,
chCond: newCond(&sync.Mutex{}),
limit: limit,
chCond: newCond(&sync.Mutex{}),
minThreshold: 1.0,
}
return mlc
}
Expand Down Expand Up @@ -72,13 +94,16 @@ func (m *memoryLimitController) TryReserveMemory(size int64) bool {
}

if atomic.CompareAndSwapInt64(&m.currentUsage, current, newUsage) {
m.checkTrigger(current, newUsage)
return true
}
}
}

func (m *memoryLimitController) ForceReserveMemory(size int64) {
atomic.AddInt64(&m.currentUsage, size)
nextUsage := atomic.AddInt64(&m.currentUsage, size)
prevUsage := nextUsage - size
m.checkTrigger(prevUsage, nextUsage)
}

func (m *memoryLimitController) ReleaseMemory(size int64) {
Expand All @@ -99,3 +124,33 @@ func (m *memoryLimitController) CurrentUsagePercent() float64 {
func (m *memoryLimitController) IsMemoryLimited() bool {
return m.limit > 0
}

func (m *memoryLimitController) RegisterTrigger(threshold float64, trigger func()) {
m.chCond.L.Lock()
defer m.chCond.L.Unlock()
if threshold < m.minThreshold {
m.minThreshold = threshold
}
m.triggers = append(m.triggers, &thresholdTrigger{
threshold: threshold,
triggerFunc: trigger,
})
}

func (m *memoryLimitController) checkTrigger(prevUsage int64, nextUsage int64) {
nextUsagePercent := float64(nextUsage) / float64(m.limit)
if nextUsagePercent < m.minThreshold {
return
}
prevUsagePercent := float64(prevUsage) / float64(m.limit)
for _, trigger := range m.triggers {
if prevUsagePercent < trigger.threshold && nextUsagePercent >= trigger.threshold {
if trigger.canTryRunning() {
go func(trigger *thresholdTrigger) {
trigger.triggerFunc()
trigger.setRunning(false)
}(trigger)
}
}
}
}
50 changes: 50 additions & 0 deletions pulsar/internal/memory_limit_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package internal

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -169,6 +170,55 @@ func TestStepRelease(t *testing.T) {
assert.Equal(t, int64(101), mlc.CurrentUsage())
}

func TestRegisterTrigger(t *testing.T) {
mlc := NewMemoryLimitController(100)
triggeredLowThreshold := false
triggeredHighThreshold := false
finishCh := make(chan struct{}, 2)

mlc.RegisterTrigger(0.5, func() {
triggeredLowThreshold = true
finishCh <- struct{}{}
})

mlc.RegisterTrigger(0.95, func() {
triggeredHighThreshold = true
finishCh <- struct{}{}
})

mlc.TryReserveMemory(50)
timer := time.NewTimer(time.Millisecond * 100)
select {
case <-finishCh:
assert.True(t, triggeredLowThreshold)
assert.False(t, triggeredHighThreshold)
case <-timer.C:
assert.Error(t, fmt.Errorf("trigger timeout"))
}

mlc.TryReserveMemory(45)
timer.Reset(time.Millisecond * 100)
select {
case <-finishCh:
assert.True(t, triggeredLowThreshold)
assert.True(t, triggeredHighThreshold)
case <-timer.C:
assert.Error(t, fmt.Errorf("trigger timeout"))
}

triggeredHighThreshold = false
mlc.ReleaseMemory(1)
assert.False(t, triggeredHighThreshold)
mlc.ForceReserveMemory(1)
timer.Reset(time.Millisecond * 100)
select {
case <-finishCh:
assert.True(t, triggeredHighThreshold)
case <-timer.C:
assert.Error(t, fmt.Errorf("trigger timeout"))
}
}

func reserveMemory(mlc MemoryLimitController, ch chan int) {
mlc.ReserveMemory(context.Background(), 1)
ch <- 1
Expand Down

0 comments on commit bec8883

Please sign in to comment.