Skip to content

Commit

Permalink
rac2: introduce cluster setting to reset token counters
Browse files Browse the repository at this point in the history
kvadmission.flow_controller.token_reset_epoch is an escape hatch for
cluster operators to reset RACv2 token counters to the full state.

The operator should increment this epoch (or change it to a value
different than before). This can be used to counteract a token leakage
bug, but note that if there is indeed a bug, the leakage may
resume, and tokens may again be exhausted. So it is expected that this
will be used together with disabling replication admission control by
setting kvadmission.flow_control.enabled=false. Note that disabling
replication admission control should be sufficient, since it should
unblock work that is waiting-for-eval. But in case there is another bug
that is preventing such work from unblocking, this setting may be useful.

Epic: CRDB-37515

Release note (ops change): The cluster setting
kvadmission.flow_controller.token_reset_epoch is an advanced setting
that can be used to refill replication admission control v2 tokens. It
should only be used after consultation with an expert.
  • Loading branch information
sumeerbhola committed Oct 23, 2024
1 parent bc87198 commit d25ceaf
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error {
return nil
})

// TokenCounterResetEpoch is an escape hatch for administrators that should
// never be needed. By incrementing this epoch (or changing it to a value
// different than before), an administrator can restore all RACv2 token
// counters to their default (full) state. This can be used to counteract a
// token leakage bug, but note that if there is indeed a bug, the leakage may
// resume, and tokens may again be exhausted. So it is expected that this will
// be used together with disabling replication admission control by setting
// kvadmission.flow_control.enabled=false. Note that disabling replication
// admission control should be sufficient, since it should unblock work that
// is waiting-for-eval. But in case there is another bug that is preventing
// such work from unblocking, this setting may be useful.
var TokenCounterResetEpoch = settings.RegisterIntSetting(
settings.SystemOnly,
"kvadmission.flow_controller.token_reset_epoch",
"escape hatch for administrators to reset all token counters to their default (full) state",
0)

// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
Expand Down
28 changes: 21 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func makeTokenCounterPerWorkClass(

// adjustTokensLocked adjusts the tokens for the given work class by delta.
func (twc *tokenCounterPerWorkClass) adjustTokensLocked(
ctx context.Context, delta kvflowcontrol.Tokens, now time.Time,
ctx context.Context, delta kvflowcontrol.Tokens, now time.Time, isReset bool,
) (adjustment, unaccounted kvflowcontrol.Tokens) {
before := twc.tokens
twc.tokens += delta
Expand All @@ -94,7 +94,7 @@ func (twc *tokenCounterPerWorkClass) adjustTokensLocked(
twc.stats.noTokenStartTime = now
}
}
if buildutil.CrdbTestBuild && unaccounted != 0 {
if buildutil.CrdbTestBuild && !isReset && unaccounted != 0 {
log.Fatalf(ctx, "unaccounted[%s]=%d delta=%d limit=%d",
twc.wc, unaccounted, delta, twc.limit)
}
Expand All @@ -108,7 +108,14 @@ func (twc *tokenCounterPerWorkClass) setLimitLocked(
) {
before := twc.limit
twc.limit = limit
twc.adjustTokensLocked(ctx, twc.limit-before, now)
twc.adjustTokensLocked(ctx, twc.limit-before, now, false /* isReset */)
}

func (twc *tokenCounterPerWorkClass) resetLocked(ctx context.Context, now time.Time) {
if twc.limit <= twc.tokens {
return
}
twc.adjustTokensLocked(ctx, twc.limit-twc.tokens, now, true /* isReset */)
}

func (twc *tokenCounterPerWorkClass) signal() {
Expand Down Expand Up @@ -220,6 +227,13 @@ func newTokenCounter(

kvflowcontrol.RegularTokensPerStream.SetOnChange(&settings.SV, onChangeFunc)
kvflowcontrol.ElasticTokensPerStream.SetOnChange(&settings.SV, onChangeFunc)
kvflowcontrol.TokenCounterResetEpoch.SetOnChange(&settings.SV, func(ctx context.Context) {
now := t.clock.PhysicalTime()
t.mu.Lock()
defer t.mu.Unlock()
t.mu.counters[admissionpb.RegularWorkClass].resetLocked(ctx, now)
t.mu.counters[admissionpb.ElasticWorkClass].resetLocked(ctx, now)
})
return t
}

Expand Down Expand Up @@ -593,15 +607,15 @@ func (t *tokenCounter) adjustLocked(
switch class {
case admissionpb.RegularWorkClass:
adjustment.regular, unaccounted.regular =
t.mu.counters[admissionpb.RegularWorkClass].adjustTokensLocked(ctx, delta, now)
t.mu.counters[admissionpb.RegularWorkClass].adjustTokensLocked(ctx, delta, now, false /* isReset */)
// Regular {deductions,returns} also affect elastic flow tokens.
adjustment.elastic, unaccounted.elastic =
t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now)
t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now, false /* isReset */)

case admissionpb.ElasticWorkClass:
// Elastic {deductions,returns} only affect elastic flow tokens.
adjustment.elastic, unaccounted.elastic =
t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now)
t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now, false /* isReset */)
}

// Adjust metrics if any tokens were actually adjusted or unaccounted for
Expand All @@ -627,7 +641,7 @@ func (t *tokenCounter) testingSetTokens(
defer t.mu.Unlock()

t.mu.counters[wc].adjustTokensLocked(ctx,
tokens-t.mu.counters[wc].tokens, t.clock.PhysicalTime())
tokens-t.mu.counters[wc].tokens, t.clock.PhysicalTime(), false /* isReset */)
}

func (t *tokenCounter) GetAndResetStats(now time.Time) (regularStats, elasticStats deltaStats) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,27 @@ func TestWaitForEval(t *testing.T) {
}
})
}

func TestTokenCounterReset(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
provider := NewStreamTokenCounterProvider(st, hlc.NewClockForTesting(nil))
stream := kvflowcontrol.Stream{StoreID: 1}
evalCounter := provider.Eval(stream)
sendCounter := provider.Send(stream)
evalCounter.Deduct(ctx, admissionpb.RegularWorkClass, 1<<20, AdjNormal)
sendCounter.Deduct(ctx, admissionpb.RegularWorkClass, 1<<20, AdjNormal)
for wc := admissionpb.WorkClass(0); wc < admissionpb.NumWorkClasses; wc++ {
require.Greater(t, evalCounter.limit(wc), evalCounter.tokens(wc))
require.Greater(t, sendCounter.limit(wc), sendCounter.tokens(wc))
}
prevEpoch := kvflowcontrol.TokenCounterResetEpoch.Get(&st.SV)
kvflowcontrol.TokenCounterResetEpoch.Override(ctx, &st.SV, prevEpoch+1)
for wc := admissionpb.WorkClass(0); wc < admissionpb.NumWorkClasses; wc++ {
require.Equal(t, evalCounter.limit(wc), evalCounter.tokens(wc))
require.Equal(t, sendCounter.limit(wc), sendCounter.tokens(wc))
}
}

0 comments on commit d25ceaf

Please sign in to comment.