diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 0cf38153a508..ec88fa0bcef1 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -24,6 +24,10 @@ import ( ) // Enabled determines whether we use flow control for replication traffic in KV. +// +// TODO(sumeer): changing this to false does not affect requests that are +// already waiting for tokens for eval in RACv1. Consider fixing and +// back-porting. var Enabled = settings.RegisterBoolSetting( settings.SystemOnly, "kvadmission.flow_control.enabled", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index a4fdd96b85c6..185de4f353cd 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "store_stream.go", "token_counter.go", "token_tracker.go", + "wait_for_eval_config.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2", visibility = ["//visibility:public"], @@ -51,6 +52,7 @@ go_test( "store_stream_test.go", "token_counter_test.go", "token_tracker_test.go", + "wait_for_eval_config_test.go", ], data = glob(["testdata/**"]), embed = [":rac2"], diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 6582773dfd46..cd0c80ac79b3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -47,16 +47,18 @@ import ( // WaitForEval for regular work. type RangeController interface { // WaitForEval seeks admission to evaluate a request at the given priority. - // This blocks until there are positive tokens available for the request to - // be admitted for evaluation, or the context is canceled (which returns an - // error). Note the number of tokens required by the request is not - // considered, only the priority of the request, as the number of tokens is - // not known until eval. + // If the priority is subject to replication admission control, it blocks + // until there are positive tokens available for the request to be admitted + // for evaluation, or the context is canceled (which returns an error). Note + // the number of tokens required by the request is not considered, only the + // priority of the request, as the number of tokens is not known until eval. // - // In the non-error case, the waited return value is true if the - // RangeController was not closed during the execution of WaitForEval. If - // closed, a (false, nil) will be returned -- this is important for the - // caller to fall back to waiting on the local store. + // In the non-error case, the waited return value is true if the priority + // was subject to replication admission control, and the RangeController was + // not closed during the execution of WaitForEval. If closed, or the + // priority is not subject to replication admission control, a (false, nil) + // will be returned -- this is important for the caller to fall back to + // waiting on the local store. // // No mutexes should be held. WaitForEval(ctx context.Context, pri admissionpb.WorkPriority) (waited bool, err error) @@ -504,6 +506,7 @@ type RangeControllerOptions struct { SendTokenWatcher *SendTokenWatcher EvalWaitMetrics *EvalWaitMetrics RangeControllerMetrics *RangeControllerMetrics + WaitForEvalConfig *WaitForEvalConfig Knobs *kvflowcontrol.TestingKnobs } @@ -635,25 +638,32 @@ func NewRangeController( return rc } -// WaitForEval blocks until there are positive tokens available for the -// request to be admitted for evaluation. Note the number of tokens required -// by the request is not considered, only the priority of the request, as the -// number of tokens is not known until eval. -// -// No mutexes should be held. +// WaitForEval implements RangeController.WaitForEval. func (rc *rangeController) WaitForEval( ctx context.Context, pri admissionpb.WorkPriority, ) (waited bool, err error) { + expensiveLoggingEnabled := log.ExpensiveLogEnabled(ctx, 2) wc := admissionpb.WorkClassFromPri(pri) + rc.opts.EvalWaitMetrics.OnWaiting(wc) + waitCategory, waitCategoryChangeCh := rc.opts.WaitForEvalConfig.Current() + bypass := waitCategory.Bypass(wc) + if bypass { + if expensiveLoggingEnabled { + log.VEventf(ctx, 2, "r%v/%v bypassed request (pri=%v)", + rc.opts.RangeID, rc.opts.LocalReplicaID, pri) + } + rc.opts.EvalWaitMetrics.OnBypassed(wc, 0 /* duration */) + return false, nil + } waitForAllReplicateHandles := false if wc == admissionpb.ElasticWorkClass { waitForAllReplicateHandles = true } var handlesScratch [5]tokenWaitingHandleInfo handles := handlesScratch[:] - var scratch []reflect.SelectCase + var scratch [7]reflect.SelectCase + selectCasesScratch := scratch[:0:cap(scratch)] - rc.opts.EvalWaitMetrics.OnWaiting(wc) start := rc.opts.Clock.PhysicalTime() retry: // Snapshot the waiter sets and the refresh channel. @@ -665,10 +675,15 @@ retry: if refreshCh == nil { // RangeControllerImpl is closed. - rc.opts.EvalWaitMetrics.OnBypassed(wc, rc.opts.Clock.PhysicalTime().Sub(start)) + waitDuration := rc.opts.Clock.PhysicalTime().Sub(start) + if expensiveLoggingEnabled { + log.VEventf(ctx, 2, + "r%v/%v bypassed request as RC closed (pri=%v wait-duration=%v)", + rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration) + } + rc.opts.EvalWaitMetrics.OnBypassed(wc, waitDuration) return false, nil } - expensiveLoggingEnabled := log.ExpensiveLogEnabled(ctx, 2) for _, vs := range vss { quorumCount := (len(vs) + 2) / 2 votersHaveEvalTokensCount := 0 @@ -738,22 +753,41 @@ retry: // parameter, to output trace statements, since expensiveLoggingEnabled // is a superset of when tracing is enabled (and in a production setting // is likely to be identical, so there isn't much waste). - state, scratch = WaitForEval( - ctx, refreshCh, handles, remainingForQuorum, expensiveLoggingEnabled, scratch) + state, selectCasesScratch = WaitForEval(ctx, waitCategoryChangeCh, refreshCh, handles, + remainingForQuorum, expensiveLoggingEnabled, selectCasesScratch) switch state { case WaitSuccess: continue case ContextCanceled: - rc.opts.EvalWaitMetrics.OnErrored(wc, rc.opts.Clock.PhysicalTime().Sub(start)) + waitDuration := rc.opts.Clock.PhysicalTime().Sub(start) + if expensiveLoggingEnabled { + log.VEventf(ctx, 2, "r%v/%v canceled request (pri=%v wait-duration=%v)", + rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration) + } + rc.opts.EvalWaitMetrics.OnErrored(wc, waitDuration) return false, ctx.Err() - case RefreshWaitSignaled: + case ConfigRefreshWaitSignaled: + waitCategory, waitCategoryChangeCh = rc.opts.WaitForEvalConfig.Current() + bypass = waitCategory.Bypass(wc) + if bypass { + waitDuration := rc.opts.Clock.PhysicalTime().Sub(start) + if expensiveLoggingEnabled { + log.VEventf(ctx, 2, + "r%v/%v bypassed request as settings changed (pri=%v wait-duration=%v)", + rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration) + } + rc.opts.EvalWaitMetrics.OnBypassed(wc, waitDuration) + return false, nil + } + goto retry + case ReplicaRefreshWaitSignaled: goto retry } } } waitDuration := rc.opts.Clock.PhysicalTime().Sub(start) if expensiveLoggingEnabled { - log.VEventf(ctx, 2, "r%v/%v admitted request (pri=%v wait-duration=%s wait-for-all=%v)", + log.VEventf(ctx, 2, "r%v/%v admitted request (pri=%v wait-duration=%v wait-for-all=%v)", rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration, waitForAllReplicateHandles) } rc.opts.EvalWaitMetrics.OnAdmitted(wc, waitDuration) @@ -1927,7 +1961,7 @@ type replicaSendStream struct { // Fields that are read/written while holding raftMu. raftMu struct { // tracker contains entries that have been sent, and have had send-tokens - // deducted (and will have had eval-tokens deducted iff index >= + // deducted (and have had eval-tokens deducted iff index >= // nextRaftIndexInitial). // // Contains no entries in probeRecentlyNoSendQ. diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index ea67a67ca63b..29d960405175 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -55,6 +55,7 @@ type testingRCState struct { ssTokenCounter *StreamTokenCounterProvider sendTokenWatcher *SendTokenWatcher probeToCloseScheduler ProbeToCloseTimerScheduler + waitForEvalConfig *WaitForEvalConfig evalMetrics *EvalWaitMetrics rcMetrics *RangeControllerMetrics // ranges contains the controllers for each range. It is the main state being @@ -71,12 +72,15 @@ func (s *testingRCState) init(t *testing.T, ctx context.Context) { s.t = t s.testCtx = ctx s.settings = cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &s.settings.SV, true) + kvflowcontrol.Mode.Override(ctx, &s.settings.SV, kvflowcontrol.ApplyToAll) s.stopper = stop.NewStopper() s.ts = timeutil.NewManualTime(timeutil.UnixEpoch) s.clock = hlc.NewClockForTesting(s.ts) s.ssTokenCounter = NewStreamTokenCounterProvider(s.settings, s.clock) s.sendTokenWatcher = NewSendTokenWatcher(s.stopper, s.ts) s.probeToCloseScheduler = &testingProbeToCloseTimerScheduler{state: s} + s.waitForEvalConfig = NewWaitForEvalConfig(s.settings) s.evalMetrics = NewEvalWaitMetrics() s.rcMetrics = NewRangeControllerMetrics() s.ranges = make(map[roachpb.RangeID]*testingRCRange) @@ -330,6 +334,7 @@ func (s *testingRCState) getOrInitRange( SendTokenWatcher: s.sendTokenWatcher, EvalWaitMetrics: s.evalMetrics, RangeControllerMetrics: s.rcMetrics, + WaitForEvalConfig: s.waitForEvalConfig, Knobs: &kvflowcontrol.TestingKnobs{}, } @@ -1397,6 +1402,32 @@ func TestRangeController(t *testing.T) { } return buf.String() + case "set-flow-control-config": + if d.HasArg("enabled") { + var enabled bool + d.ScanArgs(t, "enabled", &enabled) + kvflowcontrol.Enabled.Override(ctx, &state.settings.SV, enabled) + } + if d.HasArg("mode") { + var mode string + d.ScanArgs(t, "mode", &mode) + var m kvflowcontrol.ModeT + switch mode { + case "apply_to_all": + m = kvflowcontrol.ApplyToAll + case "apply_to_elastic": + m = kvflowcontrol.ApplyToElastic + default: + panic(fmt.Sprintf("unknown mode %s", mode)) + } + kvflowcontrol.Mode.Override(ctx, &state.settings.SV, m) + } + // Sleep for a bit to allow any timers to fire. + time.Sleep(20 * time.Millisecond) + return fmt.Sprintf("enabled: %t mode: %v", + kvflowcontrol.Enabled.Get(&state.settings.SV), + kvflowcontrol.Mode.Get(&state.settings.SV)) + default: panic(fmt.Sprintf("unknown command: %s", d.Cmd)) } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval index df9200485fc3..1c1a33d5b736 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval @@ -201,7 +201,7 @@ range_id=1 tenant_id={1} local_replica_id=1 name=d pri=high-pri done=true waited=true err= name=e pri=low-pri done=false waited=false err= -# Add tokens to s3 (voter) stream. The low priority evaluation should stil not +# Add tokens to s3 (voter) stream. The low priority evaluation should still not # complete, as the non-voter stream (s4) does not have tokens. adjust_tokens store_id=3 pri=LowPri tokens=1 @@ -514,7 +514,7 @@ t1/s5: eval reg=+1 B/+1 B ela=+1 B/+1 B send reg=+0 B/+1 B ela=+0 B/+1 B # Start an evaluation 'k' on r1, that does not complete since the leaseholder -# (s3) has tokens. +# (s3) has no tokens. wait_for_eval name=k range_id=1 pri=HighPri ---- range_id=1 tenant_id={1} local_replica_id=1 @@ -531,7 +531,7 @@ range_id=1 tenant_id={1} local_replica_id=1 range_id=2 tenant_id={1} local_replica_id=1 name=h pri=high-pri done=true waited=true err= -# Close all the RangeControllers. Evaluation 'j' is done, but specifies waited +# Close all the RangeControllers. Evaluation 'k' is done, but specifies waited # is false, and error is nil. close_rcs ---- diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_config_change b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_config_change new file mode 100644 index 000000000000..1514fab98bcf --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_config_change @@ -0,0 +1,117 @@ +# Initialize a range with voters on s1,s2 and s3. The local replica and +# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. Also +# set streams to initially have 0 tokens and a limit of 8MiB tokens. +init regular_limit=8MiB regular_init=0 elastic_limit=8MiB elastic_init=0 +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB +t1/s2: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB + +# Wait for eval for high priority request 'a'. It is waiting since there are +# no tokens. +wait_for_eval name=a range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=false waited=false err= + +# Disable replication admission control. +set-flow-control-config enabled=false +---- +enabled: false mode: apply_to_all + +# The request stops waiting. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + +# Enable replication admission control. +set-flow-control-config enabled=true +---- +enabled: true mode: apply_to_all + +# Wait for eval for normal priority request 'b'. It is waiting since there are +# no tokens. +wait_for_eval name=b range_id=1 pri=NormalPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=false waited=false err= + +# Change replication admission control to only apply to elastic work. +set-flow-control-config mode=apply_to_elastic +---- +enabled: true mode: apply_to_elastic + +# The request stops waiting. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=true waited=false err= + +# Restore enabled and apply_to_all. +set-flow-control-config mode=apply_to_all +---- +enabled: true mode: apply_to_all + +# Wait for eval for low priority request 'c'. It is waiting since there are no +# tokens. +wait_for_eval name=c range_id=1 pri=LowPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=true waited=false err= + name=c pri=low-pri done=false waited=false err= + +# Change replication admission control to only apply to elastic work. +set-flow-control-config mode=apply_to_elastic +---- +enabled: true mode: apply_to_elastic + +# The change does not affect request 'c'. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=true waited=false err= + name=c pri=low-pri done=false waited=false err= + +# Change replication admission control to apply to all work. +set-flow-control-config mode=apply_to_all +---- +enabled: true mode: apply_to_all + +# The change does not affect request 'c'. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=true waited=false err= + name=c pri=low-pri done=false waited=false err= + +# Disable replication admission control. +set-flow-control-config enabled=false +---- +enabled: false mode: apply_to_all + +# The request stops waiting. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true waited=false err= + name=b pri=normal-pri done=true waited=false err= + name=c pri=low-pri done=true waited=false err= + +# Restore settings. +set-flow-control-config enabled=true +---- +enabled: true mode: apply_to_all diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_send_q b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_send_q index 93e4feffca5c..d2161265dcdb 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_send_q +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/wait_for_eval_send_q @@ -194,7 +194,7 @@ NormalPri: term=1 index=4 tokens=1048576 ++++ -# Since there is a quorum with no send-queue (replicas 1 and 2), the evalution +# Since there is a quorum with no send-queue (replicas 1 and 2), the evaluation # of c completes. check_state ---- diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/wait_for_eval b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/wait_for_eval index 47b88217bb96..a99e264590fe 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/wait_for_eval +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/wait_for_eval @@ -158,15 +158,15 @@ d: waiting e: context_cancelled f: waiting -# Send a refresh signal before any tokens are available -refresh name=f +# Send a refresh signal before any tokens are available. +refresh name=f kind=replica ---- a: wait_success b: wait_success c: wait_success d: waiting e: context_cancelled -f: refresh_wait_signaled +f: replica_refresh_wait_signaled # Lastly, test out a WaitForEval operation with 3 handles overlapping the next # WaitForEval operation. @@ -180,7 +180,7 @@ b: wait_success c: wait_success d: waiting e: context_cancelled -f: refresh_wait_signaled +f: replica_refresh_wait_signaled g: waiting wait_for_eval name=h quorum=2 @@ -193,7 +193,7 @@ b: wait_success c: wait_success d: waiting e: context_cancelled -f: refresh_wait_signaled +f: replica_refresh_wait_signaled g: waiting h: waiting @@ -225,7 +225,7 @@ b: wait_success c: wait_success d: waiting e: context_cancelled -f: refresh_wait_signaled +f: replica_refresh_wait_signaled g: waiting h: wait_success @@ -257,6 +257,35 @@ b: wait_success c: wait_success d: wait_success e: context_cancelled -f: refresh_wait_signaled +f: replica_refresh_wait_signaled g: wait_success h: wait_success + +# Test out another refresh signal on i. +wait_for_eval name=i quorum=2 +handle: stream=s1 required=true +handle: stream=s10 required=true +handle: stream=s13 required=false +---- +a: wait_success +b: wait_success +c: wait_success +d: wait_success +e: context_cancelled +f: replica_refresh_wait_signaled +g: wait_success +h: wait_success +i: waiting + +# Send a refresh signal before any tokens are available. +refresh name=i kind=config +---- +a: wait_success +b: wait_success +c: wait_success +d: wait_success +e: context_cancelled +f: replica_refresh_wait_signaled +g: wait_success +h: wait_success +i: config_refresh_wait_signaled diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index 10f04949b4b8..893a699c2eed 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -429,8 +429,12 @@ const ( WaitSuccess WaitEndState = iota // ContextCanceled indicates that the context was canceled. ContextCanceled - // RefreshWaitSignaled indicates that the refresh channel was signaled. - RefreshWaitSignaled + // ConfigRefreshWaitSignaled indicates that the config refresh channel was + // signaled. + ConfigRefreshWaitSignaled + // ReplicaRefreshWaitSignaled indicates that the replica refresh channel was + // signaled. + ReplicaRefreshWaitSignaled ) func (s WaitEndState) String() string { @@ -444,21 +448,24 @@ func (s WaitEndState) SafeFormat(w redact.SafePrinter, _ rune) { w.Print("wait_success") case ContextCanceled: w.Print("context_cancelled") - case RefreshWaitSignaled: - w.Print("refresh_wait_signaled") + case ConfigRefreshWaitSignaled: + w.Print("config_refresh_wait_signaled") + case ReplicaRefreshWaitSignaled: + w.Print("replica_refresh_wait_signaled") default: panic(fmt.Sprintf("unknown wait_end_state(%d)", int(s))) } } // WaitForEval waits for a quorum of handles to be signaled and have tokens -// available, including all the required wait handles. The caller can provide a -// refresh channel, which when signaled will cause the function to return -// RefreshWaitSignaled, allowing the caller to retry waiting with updated +// available, including all the required wait handles. The caller provides two +// refresh channels, which when signaled will cause the function to return +// RefreshWait{1,2}Signaled, allowing the caller to retry waiting with updated // handles. func WaitForEval( ctx context.Context, - refreshWaitCh <-chan struct{}, + configRefreshWaitCh <-chan struct{}, + replicaRefreshWaitCh <-chan struct{}, handles []tokenWaitingHandleInfo, requiredQuorum int, traceIndividualWaits bool, @@ -474,7 +481,9 @@ func WaitForEval( scratch = append(scratch, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}) scratch = append(scratch, - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(refreshWaitCh)}) + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(configRefreshWaitCh)}) + scratch = append(scratch, + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(replicaRefreshWaitCh)}) requiredWaitCount := 0 for _, h := range handles { @@ -510,11 +519,16 @@ func WaitForEval( return ContextCanceled, scratch case 1: if traceIndividualWaits { - log.Eventf(ctx, "wait-for-eval: waited until channel refreshed") + log.Eventf(ctx, "wait-for-eval: waited until channel1 refreshed") + } + return ConfigRefreshWaitSignaled, scratch + case 2: + if traceIndividualWaits { + log.Eventf(ctx, "wait-for-eval: waited until channel2 refreshed") } - return RefreshWaitSignaled, scratch + return ReplicaRefreshWaitSignaled, scratch default: - handleInfo := handles[chosen-2] + handleInfo := handles[chosen-3] if available := handleInfo.handle.ConfirmHaveTokensAndUnblockNextWaiter(); !available { // The handle was signaled but does not currently have tokens // available. Continue waiting on this handle. @@ -534,7 +548,7 @@ func WaitForEval( m-- scratch[chosen], scratch[m] = scratch[m], scratch[chosen] scratch = scratch[:m] - handles[chosen-2], handles[m-2] = handles[m-2], handles[chosen-2] + handles[chosen-3], handles[m-3] = handles[m-3], handles[chosen-3] } } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go index a7294ccb2dc3..2392a9d96094 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go @@ -436,11 +436,12 @@ type evalTestState struct { } type testEval struct { - state WaitEndState - handles []tokenWaitingHandleInfo - quorum int - cancel context.CancelFunc - refreshCh chan struct{} + state WaitEndState + handles []tokenWaitingHandleInfo + quorum int + cancel context.CancelFunc + configRefreshCh chan struct{} + replicaRefreshCh chan struct{} } func newTestState() *evalTestState { @@ -490,17 +491,20 @@ func (ts *evalTestState) startWaitForEval( defer ts.mu.Unlock() ctx, cancel := context.WithCancel(context.Background()) - refreshCh := make(chan struct{}) + configRefreshCh := make(chan struct{}) + replicaRefreshCh := make(chan struct{}) ts.mu.evals[name] = &testEval{ - state: -1, - handles: handles, - quorum: quorum, - cancel: cancel, - refreshCh: refreshCh, + state: -1, + handles: handles, + quorum: quorum, + cancel: cancel, + configRefreshCh: configRefreshCh, + replicaRefreshCh: replicaRefreshCh, } go func() { - state, _ := WaitForEval(ctx, refreshCh, handles, quorum, false, nil) + state, _ := WaitForEval( + ctx, configRefreshCh, replicaRefreshCh, handles, quorum, false, nil) ts.mu.Lock() defer ts.mu.Unlock() @@ -657,11 +661,20 @@ func TestWaitForEval(t *testing.T) { case "refresh": var name string d.ScanArgs(t, "name", &name) + var kind string + d.ScanArgs(t, "kind", &kind) func() { ts.mu.Lock() defer ts.mu.Unlock() if op, exists := ts.mu.evals[name]; exists { - op.refreshCh <- struct{}{} + switch kind { + case "config": + op.configRefreshCh <- struct{}{} + case "replica": + op.replicaRefreshCh <- struct{}{} + default: + panic(fmt.Sprintf("unknown channel kind %s", kind)) + } } else { panic(fmt.Sprintf("no WaitForEval operation with name: %s", name)) } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go new file mode 100644 index 000000000000..5378b078f15e --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config.go @@ -0,0 +1,112 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rac2 + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// WaitForEvalCategory specifies what category of work is subject to waiting +// in WaitForEval. +type WaitForEvalCategory uint8 + +const ( + NoWorkWaitsForEval WaitForEvalCategory = iota + OnlyElasticWorkWaitsForEval + AllWorkWaitsForEval +) + +// Bypass returns true iff the given WorkClass does not need to wait for +// tokens in WaitForEval. +func (wec WaitForEvalCategory) Bypass(wc admissionpb.WorkClass) bool { + return wec == NoWorkWaitsForEval || + (wec == OnlyElasticWorkWaitsForEval && wc == admissionpb.RegularWorkClass) +} + +// WaitForEvalConfig provides the latest configuration related to WaitForEval. +type WaitForEvalConfig struct { + st *cluster.Settings + mu struct { + syncutil.RWMutex + waitCategory WaitForEvalCategory + waitCategoryDecreasedCh chan struct{} + } +} + +// NewWaitForEvalConfig constructs WaitForEvalConfig. +func NewWaitForEvalConfig(st *cluster.Settings) *WaitForEvalConfig { + w := &WaitForEvalConfig{st: st} + kvflowcontrol.Mode.SetOnChange(&st.SV, func(ctx context.Context) { + w.notifyChanged() + }) + kvflowcontrol.Enabled.SetOnChange(&st.SV, func(ctx context.Context) { + w.notifyChanged() + }) + // Ensure initialization. + w.notifyChanged() + return w +} + +// notifyChanged is called whenever any of the cluster settings that affect +// WaitForEval change. It is also called for initialization. +func (w *WaitForEvalConfig) notifyChanged() { + w.mu.Lock() + defer w.mu.Unlock() + // Call computeCategory while holding w.mu to serialize the computation in + // case of concurrent callbacks. This ensures the latest settings are used + // to set the current state, and we don't have a situation where a slow + // goroutine samples the settings, then after some arbitrary duration + // acquires the mutex and sets a stale state. + waitCategory := w.computeCategory() + if w.mu.waitCategoryDecreasedCh == nil { + // Initialization. + w.mu.waitCategoryDecreasedCh = make(chan struct{}) + w.mu.waitCategory = waitCategory + return + } + // Not initialization. + if w.mu.waitCategory > waitCategory { + close(w.mu.waitCategoryDecreasedCh) + w.mu.waitCategoryDecreasedCh = make(chan struct{}) + } + // Else w.mu.waitCategory <= waitCategory. Since the set of requests that + // are subject to replication admission/flow control is growing (or staying + // the same), we don't need to tell the existing waiting requests to restart + // their wait, using the latest value of waitCategory, since they are + // unaffected by the change. + + w.mu.waitCategory = waitCategory +} + +// Current returns the current category, and a channel that will be closed if +// the category value decreases (which narrows the set of work that needs to +// WaitForEval). +func (w *WaitForEvalConfig) Current() (WaitForEvalCategory, <-chan struct{}) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.mu.waitCategory, w.mu.waitCategoryDecreasedCh +} + +func (w *WaitForEvalConfig) computeCategory() WaitForEvalCategory { + enabled := kvflowcontrol.Enabled.Get(&w.st.SV) + if !enabled { + return NoWorkWaitsForEval + } + mode := kvflowcontrol.Mode.Get(&w.st.SV) + switch mode { + case kvflowcontrol.ApplyToElastic: + return OnlyElasticWorkWaitsForEval + case kvflowcontrol.ApplyToAll: + return AllWorkWaitsForEval + } + panic(errors.AssertionFailedf("unknown mode %v", mode)) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go new file mode 100644 index 000000000000..b5d266fd8791 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/wait_for_eval_config_test.go @@ -0,0 +1,72 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rac2 + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestWaitForEvalConfig(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + st := cluster.MakeTestingClusterSettings() + ctx := context.Background() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll) + + // All work waits for eval. + w := NewWaitForEvalConfig(st) + wec, ch1 := w.Current() + require.Equal(t, AllWorkWaitsForEval, wec) + require.NotNil(t, ch1) + require.False(t, wec.Bypass(admissionpb.ElasticWorkClass)) + require.False(t, wec.Bypass(admissionpb.RegularWorkClass)) + + // No work waits for eval. + kvflowcontrol.Enabled.Override(ctx, &st.SV, false) + var ch2 <-chan struct{} + wec, ch2 = w.Current() + require.Equal(t, NoWorkWaitsForEval, wec) + require.NotNil(t, ch2) + require.NotEqual(t, ch1, ch2) + require.True(t, wec.Bypass(admissionpb.ElasticWorkClass)) + require.True(t, wec.Bypass(admissionpb.RegularWorkClass)) + + // All work waits for eval. + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + var ch3 <-chan struct{} + wec, ch3 = w.Current() + require.Equal(t, AllWorkWaitsForEval, wec) + // Channel has not changed. + require.Equal(t, ch2, ch3) + + // Elastic work waits for eval. + kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToElastic) + var ch4 <-chan struct{} + wec, ch4 = w.Current() + require.Equal(t, OnlyElasticWorkWaitsForEval, wec) + require.NotNil(t, ch4) + require.NotEqual(t, ch3, ch4) + require.False(t, wec.Bypass(admissionpb.ElasticWorkClass)) + require.True(t, wec.Bypass(admissionpb.RegularWorkClass)) + + // All work waits for eval. + kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToAll) + var ch5 <-chan struct{} + wec, ch5 = w.Current() + require.Equal(t, AllWorkWaitsForEval, wec) + // Channel has not changed. + require.Equal(t, ch4, ch5) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index c742c3ef2e9b..c10e7bde5082 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/raft/raftpb", "//pkg/raft/tracker", "//pkg/roachpb", - "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/hlc", @@ -54,7 +53,6 @@ go_test( "//pkg/raft", "//pkg/raft/raftpb", "//pkg/roachpb", - "//pkg/settings/cluster", "//pkg/testutils/datapathutils", "//pkg/util/admission/admissionpb", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 2434948f97b3..2797e2f76167 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -160,7 +159,6 @@ type ProcessorOptions struct { ACWorkQueue ACWorkQueue MsgAppSender rac2.MsgAppSender RangeControllerFactory RangeControllerFactory - Settings *cluster.Settings EvalWaitMetrics *rac2.EvalWaitMetrics EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel @@ -1120,14 +1118,6 @@ func (p *processorImpl) HoldsSendTokensLocked() bool { func (p *processorImpl) AdmitForEval( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, ) (admitted bool, err error) { - workClass := admissionpb.WorkClassFromPri(pri) - mode := kvflowcontrol.Mode.Get(&p.opts.Settings.SV) - bypass := mode == kvflowcontrol.ApplyToElastic && workClass == admissionpb.RegularWorkClass - if bypass { - p.opts.EvalWaitMetrics.OnWaiting(workClass) - p.opts.EvalWaitMetrics.OnBypassed(workClass, 0 /* duration */) - return false, nil - } var rc rac2.RangeController func() { p.leader.rcReferenceUpdateMu.RLock() @@ -1135,6 +1125,11 @@ func (p *processorImpl) AdmitForEval( rc = p.leader.rc }() if rc == nil { + workClass := admissionpb.WorkClassFromPri(pri) + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEventf(ctx, 2, "r%v/%v bypassed request as no RC (pri=%v)", + p.opts.RangeID, p.opts.ReplicaID, pri) + } p.opts.EvalWaitMetrics.OnWaiting(workClass) p.opts.EvalWaitMetrics.OnBypassed(workClass, 0 /* duration */) return false, nil @@ -1189,6 +1184,7 @@ type RangeControllerFactoryImpl struct { closeTimerScheduler rac2.ProbeToCloseTimerScheduler scheduler rac2.Scheduler sendTokenWatcher *rac2.SendTokenWatcher + waitForEvalConfig *rac2.WaitForEvalConfig knobs *kvflowcontrol.TestingKnobs } @@ -1200,6 +1196,7 @@ func NewRangeControllerFactoryImpl( closeTimerScheduler rac2.ProbeToCloseTimerScheduler, scheduler rac2.Scheduler, sendTokenWatcher *rac2.SendTokenWatcher, + waitForEvalConfig *rac2.WaitForEvalConfig, knobs *kvflowcontrol.TestingKnobs, ) RangeControllerFactoryImpl { return RangeControllerFactoryImpl{ @@ -1210,6 +1207,7 @@ func NewRangeControllerFactoryImpl( closeTimerScheduler: closeTimerScheduler, scheduler: scheduler, sendTokenWatcher: sendTokenWatcher, + waitForEvalConfig: waitForEvalConfig, knobs: knobs, } } @@ -1233,6 +1231,7 @@ func (f RangeControllerFactoryImpl) New( SendTokenWatcher: f.sendTokenWatcher, EvalWaitMetrics: f.evalWaitMetrics, RangeControllerMetrics: f.rangeControllerMetrics, + WaitForEvalConfig: f.waitForEvalConfig, Knobs: f.knobs, }, rac2.RangeControllerInitState{ diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 3487e344e6e3..5efce9a4484e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -263,7 +262,6 @@ func TestProcessorBasic(t *testing.T) { var piggybacker testAdmittedPiggybacker var q testACWorkQueue var rcFactory testRangeControllerFactory - var st *cluster.Settings var p *processorImpl tenantID := roachpb.MustMakeTenantID(4) reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) { @@ -273,8 +271,6 @@ func TestProcessorBasic(t *testing.T) { piggybacker = testAdmittedPiggybacker{b: &b} q = testACWorkQueue{b: &b} rcFactory = testRangeControllerFactory{b: &b} - st = cluster.MakeTestingClusterSettings() - kvflowcontrol.Mode.Override(ctx, &st.SV, kvflowcontrol.ApplyToElastic) p = NewProcessor(ProcessorOptions{ NodeID: 1, StoreID: 2, @@ -286,7 +282,6 @@ func TestProcessorBasic(t *testing.T) { ACWorkQueue: &q, MsgAppSender: testMsgAppSender{}, RangeControllerFactory: &rcFactory, - Settings: st, EnabledWhenLeaderLevel: enabled, EvalWaitMetrics: rac2.NewEvalWaitMetrics(), }).(*processorImpl) @@ -483,21 +478,6 @@ func TestProcessorBasic(t *testing.T) { printLogTracker() return builderStr() - case "set-flow-control-mode": - var mode string - d.ScanArgs(t, "mode", &mode) - var modeVal kvflowcontrol.ModeT - switch mode { - case "apply-to-all": - modeVal = kvflowcontrol.ApplyToAll - case "apply-to-elastic": - modeVal = kvflowcontrol.ApplyToElastic - default: - t.Fatalf("unknown mode: %s", mode) - } - kvflowcontrol.Mode.Override(ctx, &st.SV, modeVal) - return builderStr() - case "admit-for-eval": pri := parseAdmissionPriority(t, d) // The callee ignores the create time. diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 36e84d751cb5..9bf6748d3390 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -341,18 +341,7 @@ admit-for-eval pri=low-pri RangeController.WaitForEval(pri=low-pri) = (waited=true err=) admitted: true err: -# AdmitForEval returns false despite there being a RangeController since -# normal-pri is not subject to replication AC. RangeController.WaitForEval was -# not called. -admit-for-eval pri=normal-pri ----- -admitted: false err: - -# Subject normal-pri to replication AC. -set-flow-control-mode mode=apply-to-all ----- - -# AdmitForEval for normal-pri returns true. +# Same for normal-pri. admit-for-eval pri=normal-pri ---- RangeController.WaitForEval(pri=normal-pri) = (waited=true err=) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 1dafec262910..c1691a2d903c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -240,7 +240,6 @@ func newUninitializedReplicaWithoutRaftGroup( AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker, ACWorkQueue: r.store.cfg.KVAdmissionController, MsgAppSender: r, - Settings: r.store.cfg.Settings, EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics, RangeControllerFactory: r.store.kvflowRangeControllerFactory, EnabledWhenLeaderLevel: r.raftMu.flowControlLevel, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 803bfd138d21..1e5bdcb7af97 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1283,6 +1283,9 @@ type StoreConfig struct { // KVFlowSendTokenWatcher is used for replication AC (flow control) v2 to // watch for elastic send tokens. KVFlowSendTokenWatcher *rac2.SendTokenWatcher + // KVFlowWaitForEvalConfig is used for configuring WaitForEval for + // replication AC (flow control) v2. + KVFlowWaitForEvalConfig *rac2.WaitForEvalConfig // KVFlowEvalWaitMetrics is used for replication AC (flow control) v2 to // track requests waiting for evaluation. KVFlowEvalWaitMetrics *rac2.EvalWaitMetrics @@ -1559,6 +1562,7 @@ func NewStore( s.stopper, timeutil.DefaultTimeSource{}, s.scheduler), (*racV2Scheduler)(s.scheduler), s.cfg.KVFlowSendTokenWatcher, + s.cfg.KVFlowWaitForEvalConfig, s.TestingKnobs().FlowControlTestingKnobs, ) diff --git a/pkg/server/server.go b/pkg/server/server.go index 9e12620b7ab4..1c03c81edb70 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -625,6 +625,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf admittedPiggybacker := node_rac2.NewAdmittedPiggybacker() streamTokenCounterProvider := rac2.NewStreamTokenCounterProvider(st, clock) sendTokenWatcher := rac2.NewSendTokenWatcher(stopper, timeutil.DefaultTimeSource{}) + waitForEvalConfig := rac2.NewWaitForEvalConfig(st) evalWaitMetrics := rac2.NewEvalWaitMetrics() rangeControllerMetrics := rac2.NewRangeControllerMetrics() nodeRegistry.AddMetricStruct(evalWaitMetrics) @@ -891,6 +892,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf KVFlowAdmittedPiggybacker: admittedPiggybacker, KVFlowStreamTokenProvider: streamTokenCounterProvider, KVFlowSendTokenWatcher: sendTokenWatcher, + KVFlowWaitForEvalConfig: waitForEvalConfig, KVFlowEvalWaitMetrics: evalWaitMetrics, KVFlowRangeControllerMetrics: rangeControllerMetrics, SchedulerLatencyListener: admissionControl.schedulerLatencyListener,