diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 2905b5b9a186..c0e9ada989ce 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -2324,6 +2324,18 @@ func (rs *replicaState) scheduledRaftMuLocked( rss.mu.sendQueue.deductedForSchedulerTokens < bytesToSend { bytesToSend = rss.mu.sendQueue.deductedForSchedulerTokens } + // TODO(sumeer): if (for some reason) many entries are not subject to + // replication AC in pull mode, and a send-queue forms, and these entries + // are > 4KiB, we will empty the send-queue one entry at a time. + // Specifically, we will only deduct 4KiB of tokens, since the approx size + // of the send-queue will be zero. Then we will call LogSlice with + // maxSize=4KiB, which will return one entry. And then we will repeat. If + // this is a real problem, we can fix this by keeping track of not just the + // preciseSizeSum of tokens needed, but also the size sum of these entries. + // Then scale up the value of maxSize=deducted*(sizeSum/preciseSizeSum). In + // this example preciseSizeSum would be 0, so we would instead scale it up + // to MaxBytesToSend. + // // NB: the rss.mu.sendQueue.deductedForScheduler.tokens represent what is // subject to RAC. But Raft is unaware of this linkage between admission // control and flow tokens, and MakeMsgApp will use this bytesToSend to @@ -2762,6 +2774,19 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueViaWatcherRaftMuAndS } } +// Requires that send-queue is non-empty. Note that it is possible that all +// the entries in the send-queue are not subject to replication admission +// control, and we will still wait for non-zero tokens. This is considered +// acceptable for two reasons (a) when nextRaftIndexInitial > indexToSend, the +// replicaSendStream does not know whether entries in [indexToSend, +// nextRaftIndexInitial) are subject to replication AC, (b) even when +// replicaSendStream has precise knowledge of every entry in the send-queue, +// it is arguably reasonable to wait for send tokens > 0, since these entries +// will impose some load on the receiver. Case (b) is going to be rare anyway, +// since very few entries are not subject to replication AC in pull mode +// (since it is active only when replication AC is "apply_to_all"), and +// usually the send-queue won't even form if the entries need zero tokens. +// // NB: raftMu may or may not be held. Specifically, when called from Notify, // raftMu is not held. func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherStreamLocked( @@ -2803,6 +2828,8 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { queueSize := rss.approxQueueSizeStreamLocked() queueSize = kvflowcontrol.Tokens(float64(queueSize) * 1.1) if queueSize < 2048 { + // NB: queueSize could be 0 if none of the entries were subject to + // replication AC. Even in that case we grab some tokens. queueSize = 4096 } flag := AdjNormal diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_q_entries_without_ac b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_q_entries_without_ac index b7a78105a090..d2fea71bf30a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_q_entries_without_ac +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_q_entries_without_ac @@ -218,11 +218,9 @@ schedule-controller-event-count: 2 scheduled-replicas: 2 # Scheduler event. Replica 2 sends the first entry that needed 1MiB of tokens. -# The second entry, that needs 0 tokens is still queued. -# -# TODO(sumeer): investigate whether we need some code improvements. Arguably, -# even though this entry is not subject to AC, we should wait for > 0 tokens, -# since it is going to cause some load on that store. +# The second entry, that needs 0 tokens, is still queued. The code makes a +# deliberate choice to wait for non-zero tokens even though the send-queue is +# only composed of entries that need 0 tokens. handle_scheduler_event range_id=1 ---- (n1,s1):1: state=replicate closed=false inflight=[2,4) send_queue=[4,4) precise_q_size=+0 B diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index a096ac9bb917..2a8897e1012d 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -96,8 +96,8 @@ message RaftMessageRequest { bool using_rac2_protocol = 12; // LowPriorityOverride is read only when UsingRAC2Protocol is true, and is // set only if Message is a MsgApp. When true, it specifies that the - // priority of the Entries in the Message are overridden to be - // raftpb.LowPri. + // priority of the Entries in the Message that are subject to replication + // admission control are overridden to be raftpb.LowPri. bool low_priority_override = 13; // AdmittedState annotates a MsgAppResp message with a vector of admitted log // indices. Used only with RACv2.