Skip to content

Commit

Permalink
rac2: small allocation optimizations in rangeController
Browse files Browse the repository at this point in the history
- Scratch for []entryFCState for the new entries being appended.
- Scratch for []tokenWaitingHandleInfo in WaitForEval.
- Accumulate the (send or eval) tokens to deduct and then make
  one call to the shared tokenCounter. This avoids repeated calls
  to PhysicalTime() and repeated acquisitons of a possibly contended
  mutex.

Informs #128033

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Oct 10, 2024
1 parent dad084c commit 2924eeb
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ type rangeController struct {
// to call into the replicaSendStreams that have asked to be scheduled.
replicas map[roachpb.ReplicaID]struct{}
}
entryFCStateScratch []entryFCState
}

// voterStateForWaiters informs whether WaitForEval is required to wait for
Expand Down Expand Up @@ -630,7 +631,8 @@ func (rc *rangeController) WaitForEval(
if wc == admissionpb.ElasticWorkClass {
waitForAllReplicateHandles = true
}
var handles []tokenWaitingHandleInfo
var handlesScratch [5]tokenWaitingHandleInfo
handles := handlesScratch[:]
var scratch []reflect.SelectCase

rc.opts.EvalWaitMetrics.OnWaiting(wc)
Expand Down Expand Up @@ -933,7 +935,11 @@ func constructRaftEventForReplica(
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
// Compute the flow control state for each new entry. We do this once
// here, instead of decoding each entry multiple times for all replicas.
newEntries := make([]entryFCState, len(e.Entries))
numEntries := len(e.Entries)
if cap(rc.entryFCStateScratch) < numEntries {
rc.entryFCStateScratch = make([]entryFCState, 0, 2*numEntries)
}
newEntries := rc.entryFCStateScratch[:numEntries:numEntries]
// needsTokens tracks which classes need tokens for the new entries. This
// informs first-pass decision-making on replicas that don't have
// send-queues, in MsgAppPull mode, and therefore can potentially send the
Expand Down Expand Up @@ -2228,6 +2234,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked(
event.sendingEntries[0].id.index, rss.mu.sendQueue.indexToSend))
}
rss.mu.sendQueue.indexToSend = event.sendingEntries[n-1].id.index + 1
var sendTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens
for _, entry := range event.sendingEntries {
if !entry.usesFlowControl {
continue
Expand Down Expand Up @@ -2257,12 +2264,17 @@ func (rss *replicaSendStream) handleReadyEntriesLocked(
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens
rss.mu.sendQueue.preciseSizeSum -= tokens
}
flag := AdjNormal
if directive.preventSendQNoForceFlush {
flag = AdjPreventSendQueue
}
rss.parent.sendTokenCounter.Deduct(ctx, WorkClassFromRaftPriority(pri), tokens, flag)
rss.mu.tracker.Track(ctx, entry.id, pri, tokens)
sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens
}
flag := AdjNormal
if directive.preventSendQNoForceFlush {
flag = AdjPreventSendQueue
}
for wc, tokens := range sendTokensToDeduct {
if tokens != 0 {
rss.parent.sendTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, flag)
}
}
if directive.preventSendQNoForceFlush {
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.PreventionCount.Inc(1)
Expand All @@ -2274,6 +2286,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked(
event.newEntries[0].id.index, rss.mu.sendQueue.nextRaftIndex))
}
rss.mu.sendQueue.nextRaftIndex = event.newEntries[n-1].id.index + 1
var evalTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens
for _, entry := range event.newEntries {
if !entry.usesFlowControl {
continue
Expand Down Expand Up @@ -2309,9 +2322,14 @@ func (rss *replicaSendStream) handleReadyEntriesLocked(
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] += tokens
}
wc := WorkClassFromRaftPriority(pri)
rss.parent.evalTokenCounter.Deduct(ctx, wc, tokens, AdjNormal)
evalTokensToDeduct[wc] += tokens
rss.mu.eval.tokensDeducted[wc] += tokens
}
for wc, tokens := range evalTokensToDeduct {
if tokens != 0 {
rss.parent.evalTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, AdjNormal)
}
}
}

if n := len(event.sendingEntries); n > 0 && event.mode == MsgAppPull {
Expand Down

0 comments on commit 2924eeb

Please sign in to comment.