From 2924eeb65b43cfc24da0ffa343d44451f735947e Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 9 Oct 2024 14:30:30 -0400 Subject: [PATCH] rac2: small allocation optimizations in rangeController - Scratch for []entryFCState for the new entries being appended. - Scratch for []tokenWaitingHandleInfo in WaitForEval. - Accumulate the (send or eval) tokens to deduct and then make one call to the shared tokenCounter. This avoids repeated calls to PhysicalTime() and repeated acquisitons of a possibly contended mutex. Informs #128033 Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index f0e4fad2362f..18fd12c5b10e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -561,6 +561,7 @@ type rangeController struct { // to call into the replicaSendStreams that have asked to be scheduled. replicas map[roachpb.ReplicaID]struct{} } + entryFCStateScratch []entryFCState } // voterStateForWaiters informs whether WaitForEval is required to wait for @@ -630,7 +631,8 @@ func (rc *rangeController) WaitForEval( if wc == admissionpb.ElasticWorkClass { waitForAllReplicateHandles = true } - var handles []tokenWaitingHandleInfo + var handlesScratch [5]tokenWaitingHandleInfo + handles := handlesScratch[:] var scratch []reflect.SelectCase rc.opts.EvalWaitMetrics.OnWaiting(wc) @@ -933,7 +935,11 @@ func constructRaftEventForReplica( func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error { // Compute the flow control state for each new entry. We do this once // here, instead of decoding each entry multiple times for all replicas. - newEntries := make([]entryFCState, len(e.Entries)) + numEntries := len(e.Entries) + if cap(rc.entryFCStateScratch) < numEntries { + rc.entryFCStateScratch = make([]entryFCState, 0, 2*numEntries) + } + newEntries := rc.entryFCStateScratch[:numEntries:numEntries] // needsTokens tracks which classes need tokens for the new entries. This // informs first-pass decision-making on replicas that don't have // send-queues, in MsgAppPull mode, and therefore can potentially send the @@ -2228,6 +2234,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( event.sendingEntries[0].id.index, rss.mu.sendQueue.indexToSend)) } rss.mu.sendQueue.indexToSend = event.sendingEntries[n-1].id.index + 1 + var sendTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens for _, entry := range event.sendingEntries { if !entry.usesFlowControl { continue @@ -2257,12 +2264,17 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens rss.mu.sendQueue.preciseSizeSum -= tokens } - flag := AdjNormal - if directive.preventSendQNoForceFlush { - flag = AdjPreventSendQueue - } - rss.parent.sendTokenCounter.Deduct(ctx, WorkClassFromRaftPriority(pri), tokens, flag) rss.mu.tracker.Track(ctx, entry.id, pri, tokens) + sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens + } + flag := AdjNormal + if directive.preventSendQNoForceFlush { + flag = AdjPreventSendQueue + } + for wc, tokens := range sendTokensToDeduct { + if tokens != 0 { + rss.parent.sendTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, flag) + } } if directive.preventSendQNoForceFlush { rss.parent.parent.opts.RangeControllerMetrics.SendQueue.PreventionCount.Inc(1) @@ -2274,6 +2286,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( event.newEntries[0].id.index, rss.mu.sendQueue.nextRaftIndex)) } rss.mu.sendQueue.nextRaftIndex = event.newEntries[n-1].id.index + 1 + var evalTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens for _, entry := range event.newEntries { if !entry.usesFlowControl { continue @@ -2309,9 +2322,14 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] += tokens } wc := WorkClassFromRaftPriority(pri) - rss.parent.evalTokenCounter.Deduct(ctx, wc, tokens, AdjNormal) + evalTokensToDeduct[wc] += tokens rss.mu.eval.tokensDeducted[wc] += tokens } + for wc, tokens := range evalTokensToDeduct { + if tokens != 0 { + rss.parent.evalTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, AdjNormal) + } + } } if n := len(event.sendingEntries); n > 0 && event.mode == MsgAppPull {