Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nack backoff policy for SDK #660

Merged
merged 12 commits into from
Nov 8, 2021
Prev Previous commit
Next Next commit
fix data race
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
  • Loading branch information
wolfstudy committed Nov 4, 2021
commit c970183f24a28e560c6ae5f2f1f8afbc1c46ecc1
60 changes: 43 additions & 17 deletions pulsar/negative_acks_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ type redeliveryConsumer interface {
type negativeAcksTracker struct {
sync.Mutex

doneCh chan interface{}
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
tick *time.Ticker
nackBackoff NackBackoffPolicy
trackFlag bool
delay time.Duration
log log.Logger
doneCh chan interface{}
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
tick *time.Ticker
tickForNackBackoff *time.Ticker
nackBackoff NackBackoffPolicy
trackFlag bool
delay time.Duration
log log.Logger
}

func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
Expand All @@ -61,13 +62,14 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
}
} else {
t = &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
tick: time.NewTicker(delay / 3),
nackBackoff: nil,
delay: delay,
log: logger,
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
tick: time.NewTicker(delay / 3),
tickForNackBackoff: nil,
nackBackoff: nil,
delay: delay,
log: logger,
}

go t.track()
Expand Down Expand Up @@ -100,7 +102,8 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
func (t *negativeAcksTracker) AddMessage(msg Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a new method here?

Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right?

Can the tracking go routine just be started at creation time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to get redeliveryCount through the Message interface

nackBackoffDelay := t.nackBackoff.Next(msg.RedeliveryCount())
t.delay = time.Duration(nackBackoffDelay)
t.tick = time.NewTicker(t.delay / 3)
t.tick = nil
t.tickForNackBackoff = time.NewTicker(t.delay / 3)

// Use trackFlag to avoid opening a new gorutine to execute `t.track()` every AddMessage.
// In fact, we only need to execute it once.
Expand Down Expand Up @@ -162,6 +165,29 @@ func (t *negativeAcksTracker) track() {
}
}

case <-t.tickForNackBackoff.C:
{
now := time.Now()
msgIds := make([]messageID, 0)

t.Lock()

for msgID, targetTime := range t.negativeAcks {
t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now)
if targetTime.Before(now) {
t.log.Debugf("Adding MsgId: %v", msgID)
msgIds = append(msgIds, msgID)
delete(t.negativeAcks, msgID)
}
}

t.Unlock()

if len(msgIds) > 0 {
t.rc.Redeliver(msgIds)
}
}

}
}
}
Expand Down