Skip to content

Commit 1dcd873

Browse files
Use popOne in JS API routed request workers (#6355)
This updates the JS API workers to use `popOne`, so that if the queue stacks up at all, the recovery is fairer across workers, rather than single workers stealing the entire queue. This was extracted from #6342. Signed-off-by: Neil Twigg <neil@nats.io>
2 parents 733315a + f1e52bd commit 1dcd873

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

server/jetstream_api.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -922,8 +922,10 @@ func (s *Server) processJSAPIRoutedRequests() {
922922
for {
923923
select {
924924
case <-queue.ch:
925-
reqs := queue.pop()
926-
for _, r := range reqs {
925+
// Only pop one item at a time here, otherwise if the system is recovering
926+
// from queue buildup, then one worker will pull off all the tasks and the
927+
// others will be starved of work.
928+
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
927929
client.pa = r.pa
928930
start := time.Now()
929931
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
@@ -932,7 +934,6 @@ func (s *Server) processJSAPIRoutedRequests() {
932934
}
933935
atomic.AddInt64(&js.apiInflight, -1)
934936
}
935-
queue.recycle(&reqs)
936937
case <-s.quitCh:
937938
return
938939
}

server/jetstream_cluster_4_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5265,3 +5265,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
52655265
// it should succeed.
52665266
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
52675267
}
5268+
5269+
func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
5270+
c := createJetStreamClusterExplicit(t, "R3S", 3)
5271+
defer c.shutdown()
5272+
5273+
nc, _ := jsClientConnect(t, c.randomNonLeader())
5274+
defer nc.Close()
5275+
5276+
// We only run 16 JetStream API workers.
5277+
mp := runtime.GOMAXPROCS(0)
5278+
if mp > 16 {
5279+
mp = 16
5280+
}
5281+
5282+
leader := c.leader()
5283+
ljs := leader.js.Load()
5284+
5285+
// Take the JS lock, which allows the JS API queue to build up.
5286+
ljs.mu.Lock()
5287+
defer ljs.mu.Unlock()
5288+
5289+
count := JSDefaultRequestQueueLimit - 1
5290+
ch := make(chan *nats.Msg, count)
5291+
5292+
inbox := nc.NewRespInbox()
5293+
_, err := nc.ChanSubscribe(inbox, ch)
5294+
require_NoError(t, err)
5295+
5296+
// To ensure a fair starting line, we need to submit as many tasks as
5297+
// there are JS workers whilst holding the JS lock. This will ensure that
5298+
// each JS API worker is properly wedged.
5299+
msg := &nats.Msg{
5300+
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
5301+
Reply: "no_one_here",
5302+
}
5303+
for i := 0; i < mp; i++ {
5304+
require_NoError(t, nc.PublishMsg(msg))
5305+
}
5306+
5307+
// Then we want to submit a fixed number of tasks, big enough to fill
5308+
// the queue, so that we can measure them.
5309+
msg = &nats.Msg{
5310+
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
5311+
Reply: inbox,
5312+
}
5313+
for i := 0; i < count; i++ {
5314+
require_NoError(t, nc.PublishMsg(msg))
5315+
}
5316+
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
5317+
if queued := leader.jsAPIRoutedReqs.len(); queued != count {
5318+
return fmt.Errorf("expected %d queued requests, got %d", count, queued)
5319+
}
5320+
return nil
5321+
})
5322+
5323+
// Now we're going to release the lock and start timing. The workers
5324+
// will now race to clear the queues and we'll wait to see how long
5325+
// it takes for them all to respond.
5326+
start := time.Now()
5327+
ljs.mu.Unlock()
5328+
for i := 0; i < count; i++ {
5329+
<-ch
5330+
}
5331+
ljs.mu.Lock()
5332+
t.Logf("Took %s to clear %d items", time.Since(start), count)
5333+
}

0 commit comments

Comments
 (0)