Skip to content

Commit

Permalink
goschedstats: add cluster setting to always do short sampling
Browse files Browse the repository at this point in the history
The adaptive sampling, with the long period equal to 250ms, can result in
sluggish changes in the number of AC slots, resulting in unnecessary
queueing. The 1ms sampling is cheap, even on an idle roachprod node.

Fixes #131766

Epic: none

Release note (ops change): The
goschedstats.always_use_short_sample_period.enabled setting should be set
to true for any serious production cluster, to prevent unnecessary queuing
in admission control CPU queues.
  • Loading branch information
sumeerbhola committed Oct 25, 2024
1 parent 24f3a24 commit 6fe87be
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
goschedstats.RegisterSettings(st)
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
stopper.AddCloser(stop.CloserFn(func() {
goschedstats.UnregisterRunnableCountCallback(cbID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/goschedstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/goschedstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -22,6 +24,7 @@ go_test(
srcs = ["runnable_test.go"],
embed = [":goschedstats"],
deps = [
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
Expand Down
33 changes: 29 additions & 4 deletions pkg/util/goschedstats/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -59,6 +61,12 @@ var _ = numRunnableGoroutines
// interaction with processor idle state
// https://github.com/golang/go/issues/30740#issuecomment-471634471. See
// #66881.
//
// The use of underloadedRunnablePerProcThreshold does not provide sufficient
// protection against sluggish response in the admission control system, which
// uses these samples to adjust concurrency of request processing. So
// goschedstats.always_use_short_sample_period.enabled can be set to true to
// force this responsiveness.
const samplePeriodShort = time.Millisecond
const samplePeriodLong = 250 * time.Millisecond

Expand All @@ -73,6 +81,12 @@ const samplePeriodLong = 250 * time.Millisecond
// 0.1 runnable goroutine per proc.
const underloadedRunnablePerProcThreshold = 1 * toFixedPoint / 10

var alwaysUseShortSamplePeriodEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"goschedstats.always_use_short_sample_period.enabled",
"when set to true, the system always does 1ms sampling of runnable queue lengths",
false)

// We "report" the average value every reportingPeriod.
// Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines()
// needs to be updated to scale the sum accordingly.
Expand Down Expand Up @@ -108,6 +122,7 @@ var callbackInfo struct {
// Multiple cbs are used only for tests which can run multiple CockroachDB
// nodes in a process.
cbs []callbackWithID
st *cluster.Settings
}

// RegisterRunnableCountCallback registers a callback to be run with the
Expand Down Expand Up @@ -155,6 +170,14 @@ func UnregisterRunnableCountCallback(id int64) {
callbackInfo.cbs = newCBs
}

// RegisterSettings provides a settings object that can be used to alter
// callback frequency.
func RegisterSettings(st *cluster.Settings) {
callbackInfo.mu.Lock()
defer callbackInfo.mu.Unlock()
callbackInfo.st = st
}

func init() {
go func() {
sst := schedStatsTicker{
Expand All @@ -167,8 +190,9 @@ func init() {
t := <-ticker.C
callbackInfo.mu.Lock()
cbs := callbackInfo.cbs
st := callbackInfo.st
callbackInfo.mu.Unlock()
sst.getStatsOnTick(t, cbs, ticker)
sst.getStatsOnTick(t, cbs, st, ticker)
}
}()
}
Expand All @@ -194,9 +218,9 @@ type schedStatsTicker struct {
localTotal, localEWMA uint64
}

// getStatsOnTick gets scheduler stats as the ticker has ticked.
// getStatsOnTick gets scheduler stats as the ticker has ticked. st can be nil.
func (s *schedStatsTicker) getStatsOnTick(
t time.Time, cbs []callbackWithID, ticker timeTickerInterface,
t time.Time, cbs []callbackWithID, st *cluster.Settings, ticker timeTickerInterface,
) {
if t.Sub(s.lastTime) > reportingPeriod {
var avgValue uint64
Expand All @@ -216,7 +240,8 @@ func (s *schedStatsTicker) getStatsOnTick(
// Both the mean over the last 1s, and the exponentially weighted average
// must be low for the system to be considered underloaded.
if avgValue < underloadedRunnablePerProcThreshold &&
s.localEWMA < underloadedRunnablePerProcThreshold {
s.localEWMA < underloadedRunnablePerProcThreshold &&
(st == nil || !alwaysUseShortSamplePeriodEnabled.Get(&st.SV)) {
// Underloaded, so switch to longer sampling period.
nextPeriod = samplePeriodLong
}
Expand Down
67 changes: 65 additions & 2 deletions pkg/util/goschedstats/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package goschedstats

import (
"context"
"fmt"
"runtime"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,7 +75,7 @@ func TestSchedStatsTicker(t *testing.T) {
// Tick every 1ms until the reportingPeriod has elapsed.
for i := 1; ; i++ {
now = now.Add(samplePeriodShort)
sst.getStatsOnTick(now, cbs, &tt)
sst.getStatsOnTick(now, cbs, nil, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
Expand All @@ -95,7 +97,7 @@ func TestSchedStatsTicker(t *testing.T) {
tt.numResets = 0
for i := 1; ; i++ {
now = now.Add(samplePeriodLong)
sst.getStatsOnTick(now, cbs, &tt)
sst.getStatsOnTick(now, cbs, nil, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
Expand All @@ -111,3 +113,64 @@ func TestSchedStatsTicker(t *testing.T) {
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
}

func TestSchedStatsTickerShortPeriodOverride(t *testing.T) {
ctx := context.Background()
var callbackSamplePeriod time.Duration
cb := func(numRunnable int, numProcs int, samplePeriod time.Duration) {
// Always underloaded.
require.Equal(t, 0, numRunnable)
require.Equal(t, 1, numProcs)
callbackSamplePeriod = samplePeriod
}
cbs := []callbackWithID{{cb, 0}}
now := timeutil.UnixEpoch
startTime := now
st := cluster.MakeTestingClusterSettings()
// Override to use short sample period.
alwaysUseShortSamplePeriodEnabled.Override(ctx, &st.SV, true)
// Start with long sample period.
sst := schedStatsTicker{
lastTime: now,
curPeriod: samplePeriodLong,
numRunnableGoroutines: func() (numRunnable int, numProcs int) { return 0, 1 },
}
tt := testTimeTicker{}
// Tick until the reportingPeriod has elapsed.
for i := 1; ; i++ {
now = now.Add(samplePeriodLong)
sst.getStatsOnTick(now, cbs, st, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
// Each tick causes a callback.
require.Equal(t, samplePeriodLong, callbackSamplePeriod)
} else {
break
}
}
// Sample period resets to short.
require.Equal(t, 1, tt.numResets)
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)

// Tick again until the reportingPeriod has elapsed.
startTime = now
tt.numResets = 0
for i := 1; ; i++ {
now = now.Add(samplePeriodShort)
sst.getStatsOnTick(now, cbs, st, &tt)
if now.Sub(startTime) <= reportingPeriod {
// No reset of the time ticker.
require.Equal(t, 0, tt.numResets)
// Each tick causes a callback.
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
} else {
break
}
}
// Still using short sample period.
require.Equal(t, 0, tt.numResets)
require.Equal(t, samplePeriodShort, tt.lastResetDuration)
require.Equal(t, samplePeriodShort, callbackSamplePeriod)
}

0 comments on commit 6fe87be

Please sign in to comment.