Skip to content

Commit

Permalink
server,kvserver,rac2: make ongoing WaitForEvals respond to settings c…
Browse files Browse the repository at this point in the history
…hange

The current config is encapsulated in WaitForEvalConfig, which is passed
to rangeController. There are now two "refresh" channels in WaitForEval,
(a) the existing one for replica changes, (b) the new one for when the
config changes in a manner that narrows the set of work that needs to
WaitForEval.

The existing code in processorImpl to look at cluster settings is removed.
This means the fast path that returns when work does not need to
WaitForEval is not as fast as before -- it needs to lookup the
rangeController and the rangeController needs to call into
WaitForEvalConfig, but it is better not to prematurely optimize by
introducing duplicate code.

Some additional tracing is added in rangeController.WaitForEval.

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Oct 16, 2024
1 parent 3eb415c commit 9d19bfd
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 107 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
)

// Enabled determines whether we use flow control for replication traffic in KV.
//
// TODO(sumeer): changing this to false does not affect requests that are
// already waiting for tokens for eval in RACv1. Consider fixing and
// back-porting.
var Enabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.flow_control.enabled",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"store_stream.go",
"token_counter.go",
"token_tracker.go",
"wait_for_eval_config.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -51,6 +52,7 @@ go_test(
"store_stream_test.go",
"token_counter_test.go",
"token_tracker_test.go",
"wait_for_eval_config_test.go",
],
data = glob(["testdata/**"]),
embed = [":rac2"],
Expand Down
84 changes: 59 additions & 25 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ import (
// WaitForEval for regular work.
type RangeController interface {
// WaitForEval seeks admission to evaluate a request at the given priority.
// This blocks until there are positive tokens available for the request to
// be admitted for evaluation, or the context is canceled (which returns an
// error). Note the number of tokens required by the request is not
// considered, only the priority of the request, as the number of tokens is
// not known until eval.
// If the priority is subject to replication admission control, it blocks
// until there are positive tokens available for the request to be admitted
// for evaluation, or the context is canceled (which returns an error). Note
// the number of tokens required by the request is not considered, only the
// priority of the request, as the number of tokens is not known until eval.
//
// In the non-error case, the waited return value is true if the
// RangeController was not closed during the execution of WaitForEval. If
// closed, a (false, nil) will be returned -- this is important for the
// caller to fall back to waiting on the local store.
// In the non-error case, the waited return value is true if the priority
// was subject to replication admission control, and the RangeController was
// not closed during the execution of WaitForEval. If closed, or the
// priority is not subject to replication admission control, a (false, nil)
// will be returned -- this is important for the caller to fall back to
// waiting on the local store.
//
// No mutexes should be held.
WaitForEval(ctx context.Context, pri admissionpb.WorkPriority) (waited bool, err error)
Expand Down Expand Up @@ -504,6 +506,7 @@ type RangeControllerOptions struct {
SendTokenWatcher *SendTokenWatcher
EvalWaitMetrics *EvalWaitMetrics
RangeControllerMetrics *RangeControllerMetrics
WaitForEvalConfig *WaitForEvalConfig
Knobs *kvflowcontrol.TestingKnobs
}

Expand Down Expand Up @@ -635,25 +638,32 @@ func NewRangeController(
return rc
}

// WaitForEval blocks until there are positive tokens available for the
// request to be admitted for evaluation. Note the number of tokens required
// by the request is not considered, only the priority of the request, as the
// number of tokens is not known until eval.
//
// No mutexes should be held.
// WaitForEval implements RangeController.WaitForEval.
func (rc *rangeController) WaitForEval(
ctx context.Context, pri admissionpb.WorkPriority,
) (waited bool, err error) {
expensiveLoggingEnabled := log.ExpensiveLogEnabled(ctx, 2)
wc := admissionpb.WorkClassFromPri(pri)
rc.opts.EvalWaitMetrics.OnWaiting(wc)
waitCategory, waitCategoryChangeCh := rc.opts.WaitForEvalConfig.Current()
bypass := waitCategory.Bypass(wc)
if bypass {
if expensiveLoggingEnabled {
log.VEventf(ctx, 2, "r%v/%v bypassed request (pri=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri)
}
rc.opts.EvalWaitMetrics.OnBypassed(wc, 0 /* duration */)
return false, nil
}
waitForAllReplicateHandles := false
if wc == admissionpb.ElasticWorkClass {
waitForAllReplicateHandles = true
}
var handlesScratch [5]tokenWaitingHandleInfo
handles := handlesScratch[:]
var scratch []reflect.SelectCase
var scratch [7]reflect.SelectCase
selectCasesScratch := scratch[:0:cap(scratch)]

rc.opts.EvalWaitMetrics.OnWaiting(wc)
start := rc.opts.Clock.PhysicalTime()
retry:
// Snapshot the waiter sets and the refresh channel.
Expand All @@ -665,10 +675,15 @@ retry:

if refreshCh == nil {
// RangeControllerImpl is closed.
rc.opts.EvalWaitMetrics.OnBypassed(wc, rc.opts.Clock.PhysicalTime().Sub(start))
waitDuration := rc.opts.Clock.PhysicalTime().Sub(start)
if expensiveLoggingEnabled {
log.VEventf(ctx, 2,
"r%v/%v bypassed request as RC closed (pri=%v wait-duration=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration)
}
rc.opts.EvalWaitMetrics.OnBypassed(wc, waitDuration)
return false, nil
}
expensiveLoggingEnabled := log.ExpensiveLogEnabled(ctx, 2)
for _, vs := range vss {
quorumCount := (len(vs) + 2) / 2
votersHaveEvalTokensCount := 0
Expand Down Expand Up @@ -738,22 +753,41 @@ retry:
// parameter, to output trace statements, since expensiveLoggingEnabled
// is a superset of when tracing is enabled (and in a production setting
// is likely to be identical, so there isn't much waste).
state, scratch = WaitForEval(
ctx, refreshCh, handles, remainingForQuorum, expensiveLoggingEnabled, scratch)
state, selectCasesScratch = WaitForEval(ctx, waitCategoryChangeCh, refreshCh, handles,
remainingForQuorum, expensiveLoggingEnabled, selectCasesScratch)
switch state {
case WaitSuccess:
continue
case ContextCanceled:
rc.opts.EvalWaitMetrics.OnErrored(wc, rc.opts.Clock.PhysicalTime().Sub(start))
waitDuration := rc.opts.Clock.PhysicalTime().Sub(start)
if expensiveLoggingEnabled {
log.VEventf(ctx, 2, "r%v/%v canceled request (pri=%v wait-duration=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration)
}
rc.opts.EvalWaitMetrics.OnErrored(wc, waitDuration)
return false, ctx.Err()
case RefreshWaitSignaled:
case ConfigRefreshWaitSignaled:
waitCategory, waitCategoryChangeCh = rc.opts.WaitForEvalConfig.Current()
bypass = waitCategory.Bypass(wc)
if bypass {
waitDuration := rc.opts.Clock.PhysicalTime().Sub(start)
if expensiveLoggingEnabled {
log.VEventf(ctx, 2,
"r%v/%v bypassed request as settings changed (pri=%v wait-duration=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration)
}
rc.opts.EvalWaitMetrics.OnBypassed(wc, waitDuration)
return false, nil
}
goto retry
case ReplicaRefreshWaitSignaled:
goto retry
}
}
}
waitDuration := rc.opts.Clock.PhysicalTime().Sub(start)
if expensiveLoggingEnabled {
log.VEventf(ctx, 2, "r%v/%v admitted request (pri=%v wait-duration=%s wait-for-all=%v)",
log.VEventf(ctx, 2, "r%v/%v admitted request (pri=%v wait-duration=%v wait-for-all=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri, waitDuration, waitForAllReplicateHandles)
}
rc.opts.EvalWaitMetrics.OnAdmitted(wc, waitDuration)
Expand Down Expand Up @@ -1927,7 +1961,7 @@ type replicaSendStream struct {
// Fields that are read/written while holding raftMu.
raftMu struct {
// tracker contains entries that have been sent, and have had send-tokens
// deducted (and will have had eval-tokens deducted iff index >=
// deducted (and have had eval-tokens deducted iff index >=
// nextRaftIndexInitial).
//
// Contains no entries in probeRecentlyNoSendQ.
Expand Down
31 changes: 31 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type testingRCState struct {
ssTokenCounter *StreamTokenCounterProvider
sendTokenWatcher *SendTokenWatcher
probeToCloseScheduler ProbeToCloseTimerScheduler
waitForEvalConfig *WaitForEvalConfig
evalMetrics *EvalWaitMetrics
rcMetrics *RangeControllerMetrics
// ranges contains the controllers for each range. It is the main state being
Expand All @@ -71,12 +72,15 @@ func (s *testingRCState) init(t *testing.T, ctx context.Context) {
s.t = t
s.testCtx = ctx
s.settings = cluster.MakeTestingClusterSettings()
kvflowcontrol.Enabled.Override(ctx, &s.settings.SV, true)
kvflowcontrol.Mode.Override(ctx, &s.settings.SV, kvflowcontrol.ApplyToAll)
s.stopper = stop.NewStopper()
s.ts = timeutil.NewManualTime(timeutil.UnixEpoch)
s.clock = hlc.NewClockForTesting(s.ts)
s.ssTokenCounter = NewStreamTokenCounterProvider(s.settings, s.clock)
s.sendTokenWatcher = NewSendTokenWatcher(s.stopper, s.ts)
s.probeToCloseScheduler = &testingProbeToCloseTimerScheduler{state: s}
s.waitForEvalConfig = NewWaitForEvalConfig(s.settings)
s.evalMetrics = NewEvalWaitMetrics()
s.rcMetrics = NewRangeControllerMetrics()
s.ranges = make(map[roachpb.RangeID]*testingRCRange)
Expand Down Expand Up @@ -330,6 +334,7 @@ func (s *testingRCState) getOrInitRange(
SendTokenWatcher: s.sendTokenWatcher,
EvalWaitMetrics: s.evalMetrics,
RangeControllerMetrics: s.rcMetrics,
WaitForEvalConfig: s.waitForEvalConfig,
Knobs: &kvflowcontrol.TestingKnobs{},
}

Expand Down Expand Up @@ -1397,6 +1402,32 @@ func TestRangeController(t *testing.T) {
}
return buf.String()

case "set-flow-control-config":
if d.HasArg("enabled") {
var enabled bool
d.ScanArgs(t, "enabled", &enabled)
kvflowcontrol.Enabled.Override(ctx, &state.settings.SV, enabled)
}
if d.HasArg("mode") {
var mode string
d.ScanArgs(t, "mode", &mode)
var m kvflowcontrol.ModeT
switch mode {
case "apply_to_all":
m = kvflowcontrol.ApplyToAll
case "apply_to_elastic":
m = kvflowcontrol.ApplyToElastic
default:
panic(fmt.Sprintf("unknown mode %s", mode))
}
kvflowcontrol.Mode.Override(ctx, &state.settings.SV, m)
}
// Sleep for a bit to allow any timers to fire.
time.Sleep(20 * time.Millisecond)
return fmt.Sprintf("enabled: %t mode: %v",
kvflowcontrol.Enabled.Get(&state.settings.SV),
kvflowcontrol.Mode.Get(&state.settings.SV))

default:
panic(fmt.Sprintf("unknown command: %s", d.Cmd))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ range_id=1 tenant_id={1} local_replica_id=1
name=d pri=high-pri done=true waited=true err=<nil>
name=e pri=low-pri done=false waited=false err=<nil>

# Add tokens to s3 (voter) stream. The low priority evaluation should stil not
# Add tokens to s3 (voter) stream. The low priority evaluation should still not
# complete, as the non-voter stream (s4) does not have tokens.
adjust_tokens
store_id=3 pri=LowPri tokens=1
Expand Down Expand Up @@ -514,7 +514,7 @@ t1/s5: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+0 B/+1 B ela=+0 B/+1 B

# Start an evaluation 'k' on r1, that does not complete since the leaseholder
# (s3) has tokens.
# (s3) has no tokens.
wait_for_eval name=k range_id=1 pri=HighPri
----
range_id=1 tenant_id={1} local_replica_id=1
Expand All @@ -531,7 +531,7 @@ range_id=1 tenant_id={1} local_replica_id=1
range_id=2 tenant_id={1} local_replica_id=1
name=h pri=high-pri done=true waited=true err=<nil>

# Close all the RangeControllers. Evaluation 'j' is done, but specifies waited
# Close all the RangeControllers. Evaluation 'k' is done, but specifies waited
# is false, and error is nil.
close_rcs
----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Initialize a range with voters on s1,s2 and s3. The local replica and
# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. Also
# set streams to initially have 0 tokens and a limit of 8MiB tokens.
init regular_limit=8MiB regular_init=0 elastic_limit=8MiB elastic_init=0
range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1
store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1
store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1
----
r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]
t1/s1: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB
send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB
t1/s2: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB
send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB
t1/s3: eval reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB
send reg=+0 B/+8.0 MiB ela=+0 B/+8.0 MiB

# Wait for eval for high priority request 'a'. It is waiting since there are
# no tokens.
wait_for_eval name=a range_id=1 pri=HighPri
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=false waited=false err=<nil>

# Disable replication admission control.
set-flow-control-config enabled=false
----
enabled: false mode: apply_to_all

# The request stops waiting.
check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>

# Enable replication admission control.
set-flow-control-config enabled=true
----
enabled: true mode: apply_to_all

# Wait for eval for normal priority request 'b'. It is waiting since there are
# no tokens.
wait_for_eval name=b range_id=1 pri=NormalPri
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=false waited=false err=<nil>

# Change replication admission control to only apply to elastic work.
set-flow-control-config mode=apply_to_elastic
----
enabled: true mode: apply_to_elastic

# The request stops waiting.
check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=true waited=false err=<nil>

# Restore enabled and apply_to_all.
set-flow-control-config mode=apply_to_all
----
enabled: true mode: apply_to_all

# Wait for eval for low priority request 'c'. It is waiting since there are no
# tokens.
wait_for_eval name=c range_id=1 pri=LowPri
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=true waited=false err=<nil>
name=c pri=low-pri done=false waited=false err=<nil>

# Change replication admission control to only apply to elastic work.
set-flow-control-config mode=apply_to_elastic
----
enabled: true mode: apply_to_elastic

# The change does not affect request 'c'.
check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=true waited=false err=<nil>
name=c pri=low-pri done=false waited=false err=<nil>

# Change replication admission control to apply to all work.
set-flow-control-config mode=apply_to_all
----
enabled: true mode: apply_to_all

# The change does not affect request 'c'.
check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=true waited=false err=<nil>
name=c pri=low-pri done=false waited=false err=<nil>

# Disable replication admission control.
set-flow-control-config enabled=false
----
enabled: false mode: apply_to_all

# The request stops waiting.
check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=high-pri done=true waited=false err=<nil>
name=b pri=normal-pri done=true waited=false err=<nil>
name=c pri=low-pri done=true waited=false err=<nil>

# Restore settings.
set-flow-control-config enabled=true
----
enabled: true mode: apply_to_all
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ NormalPri:
term=1 index=4 tokens=1048576
++++

# Since there is a quorum with no send-queue (replicas 1 and 2), the evalution
# Since there is a quorum with no send-queue (replicas 1 and 2), the evaluation
# of c completes.
check_state
----
Expand Down
Loading

0 comments on commit 9d19bfd

Please sign in to comment.