From 91c65f2307e8f5094262a59f9f3a75dcc47ad9b7 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sat, 15 Feb 2025 09:16:15 -0800 Subject: [PATCH 1/7] server: pass BaseConfig to startSampleEnvironment Epic: none Release note: None --- pkg/server/env_sampler.go | 16 ++++++---------- pkg/server/server.go | 6 +----- pkg/server/tenant.go | 6 +----- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/pkg/server/env_sampler.go b/pkg/server/env_sampler.go index e41b6fa085bc..4c4ceac3f3f1 100644 --- a/pkg/server/env_sampler.go +++ b/pkg/server/env_sampler.go @@ -39,27 +39,23 @@ type sampleEnvironmentCfg struct { // when appropriate, creates goroutine and/or heap dumps. func startSampleEnvironment( ctx context.Context, - settings *cluster.Settings, + srvCfg *BaseConfig, stopper *stop.Stopper, - goroutineDumpDirName string, - heapProfileDirName string, - cpuProfileDirName string, runtimeSampler *status.RuntimeStatSampler, sessionRegistry *sql.SessionRegistry, rootMemMonitor *mon.BytesMonitor, - testingKnobs base.TestingKnobs, ) error { metricsSampleInterval := base.DefaultMetricsSampleInterval - if p, ok := testingKnobs.Server.(*TestingKnobs); ok && p.EnvironmentSampleInterval != time.Duration(0) { + if p, ok := srvCfg.TestingKnobs.Server.(*TestingKnobs); ok && p.EnvironmentSampleInterval != time.Duration(0) { metricsSampleInterval = p.EnvironmentSampleInterval } cfg := sampleEnvironmentCfg{ - st: settings, + st: srvCfg.Settings, stopper: stopper, minSampleInterval: metricsSampleInterval, - goroutineDumpDirName: goroutineDumpDirName, - heapProfileDirName: heapProfileDirName, - cpuProfileDirName: cpuProfileDirName, + goroutineDumpDirName: srvCfg.GoroutineDumpDirName, + heapProfileDirName: srvCfg.HeapProfileDirName, + cpuProfileDirName: srvCfg.CPUProfileDirName, runtime: runtimeSampler, sessionRegistry: sessionRegistry, rootMemMonitor: rootMemMonitor, diff --git a/pkg/server/server.go b/pkg/server/server.go index 14f6d658c33a..f23fd236e7e9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1876,15 +1876,11 @@ func (s *topLevelServer) PreStart(ctx context.Context) error { // Begin recording runtime statistics. if err := startSampleEnvironment(workersCtx, - s.ClusterSettings(), + &s.cfg.BaseConfig, s.stopper, - s.cfg.GoroutineDumpDirName, - s.cfg.HeapProfileDirName, - s.cfg.CPUProfileDirName, s.runtime, s.status.sessionRegistry, s.sqlServer.execCfg.RootMemoryMonitor, - s.cfg.TestingKnobs, ); err != nil { return err } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 45f7dc3afaea..6d0933553f85 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -758,15 +758,11 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { if !s.sqlServer.cfg.DisableRuntimeStatsMonitor { // Begin recording runtime statistics. if err := startSampleEnvironment(workersCtx, - s.ClusterSettings(), + s.sqlServer.cfg, s.stopper, - s.sqlServer.cfg.GoroutineDumpDirName, - s.sqlServer.cfg.HeapProfileDirName, - s.sqlServer.cfg.CPUProfileDirName, s.runtime, s.tenantStatus.sessionRegistry, s.sqlServer.execCfg.RootMemoryMonitor, - s.cfg.TestingKnobs, ); err != nil { return err } From 7ab95b213d37074d056a4b93a4912041115ffde5 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sat, 15 Feb 2025 19:04:54 -0800 Subject: [PATCH 2/7] server: purge jemalloc when cgo mem overhead is high Whenever we sample the environment (every 10s by default), we check if the CGo overhead is above 20%; in which case we tell jemalloc to purge dirty pages. We limit the frequency of the purges to one every 2 minutes (these settings are configurable). This is meant to be a backup safety mechanism to avoid OOMs. With the updated jemalloc default configuration, I don't expect this to happen much in practice (if at all). Fixes: #141379 Release note: None --- pkg/server/env_sampler.go | 27 +++++++++ pkg/server/server.go | 1 + pkg/server/status/BUILD.bazel | 1 + pkg/server/status/runtime.go | 28 ++++++++- pkg/server/status/runtime_jemalloc.go | 83 +++++++++++++++++++++++---- pkg/server/tenant.go | 1 + 6 files changed, 128 insertions(+), 13 deletions(-) diff --git a/pkg/server/env_sampler.go b/pkg/server/env_sampler.go index 4c4ceac3f3f1..a7a52047ba5c 100644 --- a/pkg/server/env_sampler.go +++ b/pkg/server/env_sampler.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/goroutinedumper" "github.com/cockroachdb/cockroach/pkg/server/profiler" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -23,6 +24,22 @@ import ( "github.com/cockroachdb/errors" ) +var jemallocPurgeOverhead = settings.RegisterIntSetting( + settings.SystemVisible, + "server.jemalloc_purge_overhead_percent", + "a purge of jemalloc dirty pages is issued once the overhead exceeds this percent (0 disables purging)", + 20, + settings.NonNegativeInt, +) + +var jemallocPurgePeriod = settings.RegisterDurationSettingWithExplicitUnit( + settings.SystemVisible, + "server.jemalloc_purge_period", + "minimum amount of time that must pass between two jemalloc dirty page purges (0 disables purging)", + 2*time.Minute, + settings.NonNegativeDuration, +) + type sampleEnvironmentCfg struct { st *cluster.Settings stopper *stop.Stopper @@ -33,13 +50,17 @@ type sampleEnvironmentCfg struct { runtime *status.RuntimeStatSampler sessionRegistry *sql.SessionRegistry rootMemMonitor *mon.BytesMonitor + cgoMemTarget uint64 } // startSampleEnvironment starts a periodic loop that samples the environment and, // when appropriate, creates goroutine and/or heap dumps. +// +// The pebbleCacheSize is used to determine a target for CGO memory allocation. func startSampleEnvironment( ctx context.Context, srvCfg *BaseConfig, + pebbleCacheSize int64, stopper *stop.Stopper, runtimeSampler *status.RuntimeStatSampler, sessionRegistry *sql.SessionRegistry, @@ -59,6 +80,7 @@ func startSampleEnvironment( runtime: runtimeSampler, sessionRegistry: sessionRegistry, rootMemMonitor: rootMemMonitor, + cgoMemTarget: max(uint64(pebbleCacheSize), 128*1024*1024), } // Immediately record summaries once on server startup. @@ -152,6 +174,11 @@ func startSampleEnvironment( cgoStats := status.GetCGoMemStats(ctx) cfg.runtime.SampleEnvironment(ctx, cgoStats) + // Maybe purge jemalloc dirty pages. + if overhead, period := jemallocPurgeOverhead.Get(&cfg.st.SV), jemallocPurgePeriod.Get(&cfg.st.SV); overhead > 0 && period > 0 { + status.CGoMemMaybePurge(ctx, cgoStats.CGoAllocatedBytes, cgoStats.CGoTotalBytes, cfg.cgoMemTarget, int(overhead), period) + } + if goroutineDumper != nil { goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value()) } diff --git a/pkg/server/server.go b/pkg/server/server.go index f23fd236e7e9..53a02411aba2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1877,6 +1877,7 @@ func (s *topLevelServer) PreStart(ctx context.Context) error { // Begin recording runtime statistics. if err := startSampleEnvironment(workersCtx, &s.cfg.BaseConfig, + s.cfg.CacheSize, s.stopper, s.runtime, s.status.sessionRegistry, diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel index 0adb5c00f88e..428fa8492043 100644 --- a/pkg/server/status/BUILD.bazel +++ b/pkg/server/status/BUILD.bazel @@ -70,6 +70,7 @@ go_library( "//pkg/util/metric", "//pkg/util/syncutil", "//pkg/util/system", + "@com_github_cockroachdb_crlib//crtime", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/server/status/runtime.go b/pkg/server/status/runtime.go index dd5999e24207..940ead86b42d 100644 --- a/pkg/server/status/runtime.go +++ b/pkg/server/status/runtime.go @@ -340,7 +340,18 @@ var diskMetricsIgnoredDevices = envutil.EnvOrDefaultString("COCKROACH_DISK_METRI // allocated uint: bytes allocated by application // total uint: total bytes requested from system // error : any issues fetching stats. This should be a warning only. -var getCgoMemStats func(context.Context) (uint, uint, error) +var getCgoMemStats func(context.Context) (cGoAlloc uint, cGoTotal uint, _ error) + +// cgoMemMaybePurge checks if the current jemalloc overhead (relative to +// cgoAllocMem or cgoTargetMem, whichever is higher) is above overheadPercent; +// if it is, a purge of all arenas is performed. We perform at most a purge per +// minPeriod. +var cgoMemMaybePurge func( + ctx context.Context, + cgoAllocMem, cgoTotalMem, cgoTargetMem uint64, + overheadPercent int, + minPeriod time.Duration, +) // Distribution of individual GC-related stop-the-world pause // latencies. This is the time from deciding to stop the world @@ -779,6 +790,21 @@ func GetCGoMemStats(ctx context.Context) *CGoMemStats { } } +// CGoMemMaybePurge checks if the current allocator overhead (relative to +// cgoAllocMem or cgoTargetMem, whichever is higher) is above overheadPercent; +// if it is, a purge of all arenas is performed. We perform at most a purge per +// minPeriod. +func CGoMemMaybePurge( + ctx context.Context, + cgoAllocMem, cgoTotalMem, cgoTargetMem uint64, + overheadPercent int, + minPeriod time.Duration, +) { + if cgoMemMaybePurge != nil { + cgoMemMaybePurge(ctx, cgoAllocMem, cgoTotalMem, cgoTargetMem, overheadPercent, minPeriod) + } +} + var netstatEvery = log.Every(time.Minute) // SampleEnvironment queries the runtime system for various interesting metrics, diff --git a/pkg/server/status/runtime_jemalloc.go b/pkg/server/status/runtime_jemalloc.go index a91170d6d569..1f25b541ae22 100644 --- a/pkg/server/status/runtime_jemalloc.go +++ b/pkg/server/status/runtime_jemalloc.go @@ -7,6 +7,19 @@ package status +import ( + "context" + "math" + "reflect" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/crlib/crtime" + "github.com/cockroachdb/redact" + "github.com/dustin/go-humanize" +) + // #cgo CPPFLAGS: -DJEMALLOC_NO_DEMANGLE // #cgo LDFLAGS: -ljemalloc // #cgo dragonfly freebsd LDFLAGS: -lm @@ -66,27 +79,27 @@ package status // } // return 0; // } +// +// #define STRINGIFY_HELPER(x) #x +// #define STRINGIFY(x) STRINGIFY_HELPER(x) +// +// int jemalloc_purge() { +// return je_mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", NULL, 0, NULL, 0); +// } import "C" -import ( - "context" - "math" - "reflect" - "strings" - - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/redact" - "github.com/dustin/go-humanize" -) - func init() { if getCgoMemStats != nil { panic("getCgoMemStats is already set") } getCgoMemStats = getJemallocStats + if cgoMemMaybePurge != nil { + panic("cgoMemMaybePurge is already set") + } + cgoMemMaybePurge = jemallocMaybePurge } -func getJemallocStats(ctx context.Context) (uint, uint, error) { +func getJemallocStats(ctx context.Context) (cgoAlloc uint, cgoTotal uint, _ error) { var js C.JemallocStats // TODO(marc): should we panic here? Failure on fetching the stats may be a problem. if _, err := C.jemalloc_get_stats(&js); err != nil { @@ -133,3 +146,49 @@ func allocateMemory() { // Empirically, 8KiB is not enough, but 16KiB is except for ppc64le, where 256KiB is needed. C.malloc(256 << 10) } + +var lastPurgeTime crtime.AtomicMono + +// jemallocMaybePurge checks if the current jemalloc overhead (relative to +// cgoAllocMem or cgoTargetMem, whichever is higher) is above overheadPercent; +// if it is, a purge of all arenas is performed. We perform at most a purge per +// minPeriod. +func jemallocMaybePurge( + ctx context.Context, + cgoAllocMem, cgoTotalMem, cgoTargetMem uint64, + overheadPercent int, + minPeriod time.Duration, +) { + // We take the max between cgoAllocMem and cgoTargetMem because we only care about + // the overhead compared to the target peak usage. + // + // We don't use just cgoAllocMem because when a lot of memory is freed + // (because of an eviction), the overhead spikes up high momentarily + // (goAllocMem goes down and goTotalMem takes a while to go down). We only + // care about overhead above the target usage. + // + // We don't use just cgoTargetMem because we don't want to purge unnecessarily + // if we actually allocate more memory than our target (e.g. because of + // internal fragmentation). + target := max(cgoAllocMem, cgoTargetMem) + if cgoTotalMem*100 <= target*uint64(100+overheadPercent) { + return + } + lastPurge := lastPurgeTime.Load() + thisPurge := crtime.NowMono() + if lastPurge != 0 && thisPurge.Sub(lastPurge) < minPeriod { + // We purged too recently. + return + } + if !lastPurgeTime.CompareAndSwap(lastPurge, thisPurge) { + // Another goroutine just purged. This should only happen in tests where we + // have multiple servers running in the same process. + return + } + res, err := C.jemalloc_purge() + if err != nil || res != 0 { + log.Warningf(ctx, "jemalloc purging failed: %v (res=%d)", err, int(res)) + } else { + log.Infof(ctx, "jemalloc arenas purged (took %s)", thisPurge.Elapsed()) + } +} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 6d0933553f85..54b7f8716edd 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -759,6 +759,7 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { // Begin recording runtime statistics. if err := startSampleEnvironment(workersCtx, s.sqlServer.cfg, + 0, /* pebbleCacheSize */ s.stopper, s.runtime, s.tenantStatus.sessionRegistry, From ffdf24a45d1c729536cda3d6cd22eda4fdd42826 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 12 Feb 2025 13:41:50 +0000 Subject: [PATCH 3/7] rac2: report stream connection status Epic: none Release note: none --- .../kvflowhandle/testdata/handle_inspect | 27 ++++++++++----- .../kvflowinspectpb/kvflowinspect.proto | 3 ++ .../kvflowcontrol/rac2/range_controller.go | 1 + .../rac2/testdata/range_controller/inspect | 33 ++++++++++++------- 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect index a325f42f561d..ca1f124ad489 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect @@ -26,7 +26,8 @@ echo "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -65,7 +66,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -104,7 +106,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -128,7 +131,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -152,7 +156,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -191,7 +196,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -215,7 +221,8 @@ echo } ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -238,7 +245,8 @@ echo "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -254,7 +262,8 @@ echo "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto index b7b7986de8e3..28be42d40eb8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto @@ -79,6 +79,9 @@ message ConnectedStream { // before sending, for this connected stream. Only populated on versions >= // 24.3. int64 total_send_deducted_tokens = 4; + // Disconnected is true if the stream has recently disconnected and no longer + // actively replicating, i.e. recently left StateReplicate in raft. + bool disconnected = 5; } // Stream represents a given kvflowcontrol.Stream and the number of tokens diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 739dee9eb198..d2cfa1507da9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -1631,6 +1631,7 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec } streams = append(streams, kvflowinspectpb.ConnectedStream{ Stream: rc.opts.SSTokenCounter.InspectStream(rs.stream), + Disconnected: rs.sendStream.mu.connectedState != replicate, TrackedDeductions: trackedDeductions, TotalEvalDeductedTokens: int64(evalTokens), TotalSendDeductedTokens: int64(sendTokens), diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect index d1dd92f03323..8c7e6382f023 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect @@ -25,7 +25,8 @@ inspect range_id=1 "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -59,7 +60,8 @@ inspect range_id=1 "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -75,7 +77,8 @@ inspect range_id=1 "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false }, { "stream": { @@ -91,7 +94,8 @@ inspect range_id=1 "tracked_deductions": [ ], "total_eval_deducted_tokens": "0", - "total_send_deducted_tokens": "0" + "total_send_deducted_tokens": "0", + "disconnected": false } ] } @@ -153,7 +157,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "8388608", - "total_send_deducted_tokens": "8388608" + "total_send_deducted_tokens": "8388608", + "disconnected": false }, { "stream": { @@ -193,7 +198,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "8388608", - "total_send_deducted_tokens": "8388608" + "total_send_deducted_tokens": "8388608", + "disconnected": false }, { "stream": { @@ -233,7 +239,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "8388608", - "total_send_deducted_tokens": "8388608" + "total_send_deducted_tokens": "8388608", + "disconnected": false } ] } @@ -292,7 +299,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "8388608", - "total_send_deducted_tokens": "8388608" + "total_send_deducted_tokens": "8388608", + "disconnected": false }, { "stream": { @@ -332,7 +340,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "8388608", - "total_send_deducted_tokens": "8388608" + "total_send_deducted_tokens": "8388608", + "disconnected": false } ] } @@ -386,7 +395,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "5242880", - "total_send_deducted_tokens": "5242880" + "total_send_deducted_tokens": "5242880", + "disconnected": false }, { "stream": { @@ -410,7 +420,8 @@ inspect range_id=1 } ], "total_eval_deducted_tokens": "2097152", - "total_send_deducted_tokens": "2097152" + "total_send_deducted_tokens": "2097152", + "disconnected": false } ] } From bbe9ddade8225ad2cdfe7e0d314821d68e1258fd Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 12 Feb 2025 13:58:03 +0000 Subject: [PATCH 4/7] kvserver: wait for all streams to be connected Previously, waitForConnectedStreams could return prematurely when one of the streams is in StateProbe and about to be closed. This commit ensures we wait for the streams to be connected / in StateReplicate. Epic: none Release note: none --- pkg/kv/kvserver/flow_control_integration_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 1c4ed82f3faf..0fee76632d35 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -4397,6 +4397,7 @@ func (h *flowControlTestHelper) waitForConnectedStreams( expConnectedStreams, serverIdx int, lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel, ) { + h.t.Helper() level := h.resolveLevelArgs(lvl...) testutils.SucceedsSoon(h.t, func() error { state, found := h.getInspectHandlesForLevel(serverIdx, level).LookupInspect(rangeID) @@ -4404,9 +4405,15 @@ func (h *flowControlTestHelper) waitForConnectedStreams( return fmt.Errorf("handle for %s not found", rangeID) } require.True(h.t, found) - if len(state.ConnectedStreams) != expConnectedStreams { - return fmt.Errorf("expected %d connected streams, got %d", - expConnectedStreams, len(state.ConnectedStreams)) + var connected int + for i := range state.ConnectedStreams { + if !state.ConnectedStreams[i].Disconnected { + connected++ + } + } + if connected != expConnectedStreams { + return fmt.Errorf("expected %d connected streams, got %d/%d", + expConnectedStreams, connected, len(state.ConnectedStreams)) } return nil }) From d9fdece13a630c886abaec9dc751bb6597dc5d75 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 18 Feb 2025 07:21:50 -0800 Subject: [PATCH 5/7] kvserver: show store number when printing metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Annotate the context to include the store tag, and add a log line with the store number as well. ``` I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 Metrics for store 1: I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 + | | | | ingested | moved | written | | amp I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 +level | tables size val-bl vtables | score | in | tables size | tables size | tables size | read | r w I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 +------+-----------------------------+-------+-------+--------------+--------------+--------------+-------+--------- ``` Epic: none Fixes: #141526 --- pkg/kv/kvserver/store.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 96a8c1b211fd..ef03d26039ca 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3705,6 +3705,7 @@ func (s *Store) computeMetricsLocked(ctx context.Context) (m storage.Metrics, er func (s *Store) ComputeMetricsPeriodically( ctx context.Context, prevMetrics *storage.MetricsForInterval, tick int, ) (m storage.Metrics, err error) { + ctx = s.AnnotateCtx(ctx) m, err = s.computeMetrics(ctx) if err != nil { return m, err @@ -3779,9 +3780,7 @@ func (s *Store) ComputeMetricsPeriodically( // non-periodic callers of this method don't trigger expensive // stats. if tick%logSSTInfoTicks == 1 /* every 10m */ { - // NB: The initial blank line ensures that compaction stats display - // will not contain the log prefix. - log.Storage.Infof(ctx, "\n%s", m.Metrics) + log.Storage.Infof(ctx, "Pebble metrics:\n%s", m.Metrics) } // Periodically emit a store stats structured event to the TELEMETRY channel, // if reporting is enabled. These events are intended to be emitted at low From 023a52ec3bc28c86551ca2b08c8dbdbafb6fdf72 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Tue, 18 Feb 2025 13:26:55 -0500 Subject: [PATCH 6/7] changefeedccl: add log to debug test Add verbose logging to test to debug issue. Fixes: #141489 Informs: #140669 Release note: None --- pkg/ccl/changefeedccl/alter_changefeed_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index c9f5716697bc..d925fd9e88c2 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1198,6 +1198,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // Set verbose log to confirm whether or not we hit the same nil row issue as in #140669 + require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2")) + rnd, seed := randutil.NewPseudoRand() t.Logf("random seed: %d", seed) From 8152daec823b5b83121b9195e7490ea0bc918d3a Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Tue, 18 Feb 2025 12:20:04 -0500 Subject: [PATCH 7/7] changefeedccl: fix test failure TestAlterChangefeedAddTargetsDuringBackfill was failing due to the interaction of the llrb span merging issue (#141405) and the new checkpoint system. Force the test to run with the btree frontier. Fixes: #141463 Release note: None --- pkg/ccl/changefeedccl/alter_changefeed_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index c9f5716697bc..84aeaf07bafb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -32,13 +32,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -1356,7 +1356,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 141463) + + // TODO(#141405): Remove this once llrbFrontier starts merging + // adjacent spans. The current lack of merging causes issues + // with the checks in this test around expected resolved spans. + defer span.EnableBtreeFrontier(true)() var rndMu struct { syncutil.Mutex