Skip to content

Commit

Permalink
Fix spurious 408s under load and move processing of acks to their own…
Browse files Browse the repository at this point in the history
… Go routine.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 25, 2022
1 parent 7bdd679 commit d69394e
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 21 deletions.
63 changes: 43 additions & 20 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,9 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)

// Now start up Go routine to process acks.
go o.processInboundAcks(qch)

// If we are R>1 spin up our proposal loop.
if node != nil {
// Determine if we can send pending requests info to the group.
Expand Down Expand Up @@ -2104,7 +2107,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {

// If we are a pull mode consumer, report on number of waiting requests.
if o.isPullMode() {
o.processWaiting()
o.processWaiting(false)
info.NumWaiting = o.waiting.len()
}
// If we were asked to snapshot do so here.
Expand Down Expand Up @@ -2646,7 +2649,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
// If we have the max number of requests already pending try to expire.
if o.waiting.isFull() {
// Try to expire some of the requests.
o.processWaiting()
o.processWaiting(false)
}

// If the request is for noWait and we have pending requests already, check if we have room.
Expand All @@ -2659,7 +2662,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
return
}
if msgsPending > 0 {
_, _, batchPending, _ := o.processWaiting()
_, _, batchPending, _ := o.processWaiting(false)
if msgsPending < uint64(batchPending) {
sendErr(408, "Requests Pending")
return
Expand Down Expand Up @@ -2834,7 +2837,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {

// Will check for expiration and lack of interest on waiting requests.
// Will also do any heartbeats and return the next expiration or HB interval.
func (o *consumer) processWaiting() (int, int, int, time.Time) {
func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
var fexp time.Time
if o.srv == nil || o.waiting.isEmpty() {
return 0, 0, 0, fexp
Expand Down Expand Up @@ -2863,7 +2866,7 @@ func (o *consumer) processWaiting() (int, int, int, time.Time) {
for i, rp, n := 0, wq.rp, wq.n; i < n; rp = (rp + 1) % cap(wq.reqs) {
wr := wq.reqs[rp]
// Check expiration.
if (wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
remove(wr, rp)
Expand Down Expand Up @@ -2917,7 +2920,7 @@ func (o *consumer) processWaiting() (int, int, int, time.Time) {

// Will check to make sure those waiting still have registered interest.
func (o *consumer) checkWaitingForInterest() bool {
o.processWaiting()
o.processWaiting(true)
return o.waiting.len() > 0
}

Expand All @@ -2929,6 +2932,30 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
return o.cfg.Heartbeat, time.NewTimer(o.cfg.Heartbeat)
}

func (o *consumer) processInboundAcks(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
o.mu.RUnlock()

for {
select {
case <-o.ackMsgs.ch:
acks := o.ackMsgs.pop()
for _, acki := range acks {
ack := acki.(*jsAckMsg)
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
case <-qch:
return
case <-s.quitCh:
return
}
}
}

func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a a reply situation where replay policy is not instant.
var (
Expand Down Expand Up @@ -2970,23 +2997,26 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
rp := mset.cfg.Retention
mset.mu.RUnlock()

var err error

// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
for {
var (
pmsg *jsPubMsg
dc uint64
dsubj string
err error
delay time.Duration
)

o.mu.Lock()
// consumer is closed when mset is set to nil.
if o.mset == nil {
o.mu.Unlock()
return
}

// Clear last error.
err = nil

// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
Expand Down Expand Up @@ -3080,7 +3110,8 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Make sure to process any expired requests that are pending.
var wrExp <-chan time.Time
if o.isPullMode() {
_, _, _, fexp := o.processWaiting()
// Dont expire oneshots if we are here because of max ack pending limit.
_, _, _, fexp := o.processWaiting(err != errMaxAckPending)
if !fexp.IsZero() {
expires := time.Until(fexp)
if expires <= 0 {
Expand All @@ -3095,25 +3126,17 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
o.mu.Unlock()

select {
case <-o.ackMsgs.ch:
acks := o.ackMsgs.pop()
for _, acki := range acks {
ack := acki.(*jsAckMsg)
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
case <-mch:
// Messages are waiting.
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
o.updateDeliveryInterest(interest)
case <-qch:
return
case <-mch:
// Messages are waiting.
case <-wrExp:
o.mu.Lock()
o.processWaiting()
o.processWaiting(true)
o.mu.Unlock()
case <-hbc:
if o.isActive() {
Expand Down
86 changes: 85 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2917,8 +2917,9 @@ func TestJetStreamWorkQueueNakRedelivery(t *testing.T) {

// Grab #6
m := getMsg(6, 6)
// NAK this one.
// NAK this one and make sure its processed.
m.Respond(AckNak)
nc.Flush()

// When we request again should be store sequence 6 again.
getMsg(6, 7)
Expand Down Expand Up @@ -5547,6 +5548,7 @@ func TestJetStreamRedeliverCount(t *testing.T) {

// Make sure it keeps getting sent back.
m.Respond(AckNak)
nc.Flush()
}
})
}
Expand Down Expand Up @@ -17918,3 +17920,85 @@ func TestJetStreamKVMemoryStorePerf(t *testing.T) {
}
fmt.Printf("Took %v for second run\n", time.Since(start))
}

func TestJetStreamMultiplePullPerf(t *testing.T) {
skip(t)

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

js.AddStream(&nats.StreamConfig{Name: "mp22", Storage: nats.FileStorage})
defer js.DeleteStream("mp22")

n, msg := 1_000_000, []byte("OK")
for i := 0; i < n; i++ {
js.PublishAsync("mp22", msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("mp22")
require_NoError(t, err)

fmt.Printf("msgs: %d, total_bytes: %v\n", si.State.Msgs, friendlyBytes(int64(si.State.Bytes)))

// 10 pull subscribers each asking for 100 msgs.
_, err = js.AddConsumer("mp22", &nats.ConsumerConfig{
Durable: "d",
MaxAckPending: 8_000,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

startCh := make(chan bool)
var wg sync.WaitGroup

np, bs := 10, 100

count := 0

for i := 0; i < np; i++ {
_, js := jsClientConnect(t, s)
sub, err := js.PullSubscribe("mp22", "d")
require_NoError(t, err)

wg.Add(1)
go func(sub *nats.Subscription) {
defer wg.Done()
<-startCh
for i := 0; i < n/(np*bs); i++ {
msgs, err := sub.Fetch(bs)
if err != nil {
t.Logf("Got error on pull: %v", err)
return
}
if len(msgs) != bs {
t.Logf("Expected %d msgs, got %d", bs, len(msgs))
return
}
count += len(msgs)
for _, m := range msgs {
m.Ack()
}
}
}(sub)
}

start := time.Now()
close(startCh)
wg.Wait()

tt := time.Since(start)
fmt.Printf("Took %v to receive %d msgs [%d]\n", tt, n, count)
fmt.Printf("%.0f msgs/s\n", float64(n)/tt.Seconds())
fmt.Printf("%.0f mb/s\n\n", float64(si.State.Bytes/(1024*1024))/tt.Seconds())
}

0 comments on commit d69394e

Please sign in to comment.