Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
141365: kvserver: wait for all streams to be connected r=sumeerbhola a=pav-kv

Previously, `waitForConnectedStreams` could return prematurely when one of the streams is in `StateProbe` and about to be closed. This commit ensures we wait for the streams to be connected / in `StateReplicate`.

Fixes #138103, #139229

141575: server: purge jemalloc when cgo mem overhead is high r=RaduBerinde a=RaduBerinde

#### server: pass BaseConfig to startSampleEnvironment

Epic: none
Release note: None

#### server: purge jemalloc when cgo mem overhead is high

Whenever we sample the environment (every 10s by default), we check if
the CGo overhead is above 20%; in which case we tell jemalloc to purge
dirty pages. We limit the frequency of the purges to one every 2
minutes (these settings are configurable).

This is meant to be a backup safety mechanism to avoid OOMs. With the
updated jemalloc default configuration, I don't expect this to happen
much in practice (if at all).

Fixes: #141379
Release note: None

---

Here is an example of a 30min TPCC run (left is master, right is with the change). I used a terrible malloc conf (`background_thread:false,dirty_decay_ms:1000000,muzzy_decay_ms:500000,narenas:128`) to get a lot of overhead. The overhead graphs show the CGo Total minus 7.6GB which is the high watermark for CGO Alloc. The purge happens when the overhead goes above ~1.5Gb.

<img width="1116" alt="image" src="https://github.com/user-attachments/assets/dc32b081-88a3-4284-9880-33e2f7d82679" />


Purge logs from one of the nodes:
```
295:I250216 18:24:27.988224 546 server/status/runtime_jemalloc.go:191 ⋮ [T1,Vsystem,n1] 185  jemalloc arenas purged (took 203.915292ms)
303:I250216 18:29:18.462705 546 server/status/runtime_jemalloc.go:191 ⋮ [T1,Vsystem,n1] 193  jemalloc arenas purged (took 206.945658ms)
307:I250216 18:35:38.638441 546 server/status/runtime_jemalloc.go:191 ⋮ [T1,Vsystem,n1] 197  jemalloc arenas purged (took 235.908998ms)
311:I250216 18:42:28.758720 546 server/status/runtime_jemalloc.go:191 ⋮ [T1,Vsystem,n1] 201  jemalloc arenas purged (took 245.213056ms)
323:I250216 18:48:58.986148 546 server/status/runtime_jemalloc.go:191 ⋮ [T1,Vsystem,n1] 213  jemalloc arenas purged (took 98.60313ms)
```



141619: kvserver: show store number when printing metrics r=rickystewart a=RaduBerinde

Annotate the context to include the store tag, and add a log line with
the store number as well.

```
I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 Pebble metrics:
I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 +      |                             |       |       |   ingested   |     moved    |    written   |       |    amp
I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 +level | tables  size val-bl vtables | score |   in  | tables  size | tables  size | tables  size |  read |   r   w
I250218 15:01:57.233455 403 3@kv/kvserver/store.go:3785 ⋮ [T1,Vsystem,n1,s1] 31 +------+-----------------------------+-------+-------+--------------+--------------+--------------+-------+---------
```

Epic: none
Fixes: #141526

141630: changefeedccl: fix test failure r=andyyang890 a=asg0451

TestAlterChangefeedAddTargetsDuringBackfill was
failing due to the interaction of the llrb span
merging issue (#141405) and the new checkpoint
system. Force the test to run with the btree
frontier.

Fixes: #141463

Release note: None


141636: changefeedccl: add log to debug test r=andyyang890 a=asg0451

Add verbose logging to test to debug issue.

Fixes: #141489
Informs: #140669

Release note: None


Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
Co-authored-by: Miles Frankel <miles.frankel@cockroachlabs.com>
  • Loading branch information
4 people committed Feb 19, 2025
6 parents 9b7e95f + bbe9dda + 7ab95b2 + d9fdece + 8152dae + 023a52e commit 6e6f5fe
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 61 deletions.
11 changes: 9 additions & 2 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1198,6 +1198,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set verbose log to confirm whether or not we hit the same nil row issue as in #140669
require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))

rnd, seed := randutil.NewPseudoRand()
t.Logf("random seed: %d", seed)

Expand Down Expand Up @@ -1356,7 +1359,11 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 141463)

// TODO(#141405): Remove this once llrbFrontier starts merging
// adjacent spans. The current lack of merging causes issues
// with the checks in this test around expected resolved spans.
defer span.EnableBtreeFrontier(true)()

var rndMu struct {
syncutil.Mutex
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4397,16 +4397,23 @@ func (h *flowControlTestHelper) waitForConnectedStreams(
expConnectedStreams, serverIdx int,
lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel,
) {
h.t.Helper()
level := h.resolveLevelArgs(lvl...)
testutils.SucceedsSoon(h.t, func() error {
state, found := h.getInspectHandlesForLevel(serverIdx, level).LookupInspect(rangeID)
if !found {
return fmt.Errorf("handle for %s not found", rangeID)
}
require.True(h.t, found)
if len(state.ConnectedStreams) != expConnectedStreams {
return fmt.Errorf("expected %d connected streams, got %d",
expConnectedStreams, len(state.ConnectedStreams))
var connected int
for i := range state.ConnectedStreams {
if !state.ConnectedStreams[i].Disconnected {
connected++
}
}
if connected != expConnectedStreams {
return fmt.Errorf("expected %d connected streams, got %d/%d",
expConnectedStreams, connected, len(state.ConnectedStreams))
}
return nil
})
Expand Down
27 changes: 18 additions & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ echo
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -65,7 +66,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -104,7 +106,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -128,7 +131,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -152,7 +156,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -191,7 +196,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -215,7 +221,8 @@ echo
}
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand All @@ -238,7 +245,8 @@ echo
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -254,7 +262,8 @@ echo
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ message ConnectedStream {
// before sending, for this connected stream. Only populated on versions >=
// 24.3.
int64 total_send_deducted_tokens = 4;
// Disconnected is true if the stream has recently disconnected and no longer
// actively replicating, i.e. recently left StateReplicate in raft.
bool disconnected = 5;
}

// Stream represents a given kvflowcontrol.Stream and the number of tokens
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,7 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec
}
streams = append(streams, kvflowinspectpb.ConnectedStream{
Stream: rc.opts.SSTokenCounter.InspectStream(rs.stream),
Disconnected: rs.sendStream.mu.connectedState != replicate,
TrackedDeductions: trackedDeductions,
TotalEvalDeductedTokens: int64(evalTokens),
TotalSendDeductedTokens: int64(sendTokens),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ inspect range_id=1
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -59,7 +60,8 @@ inspect range_id=1
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -75,7 +77,8 @@ inspect range_id=1
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
},
{
"stream": {
Expand All @@ -91,7 +94,8 @@ inspect range_id=1
"tracked_deductions": [
],
"total_eval_deducted_tokens": "0",
"total_send_deducted_tokens": "0"
"total_send_deducted_tokens": "0",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -153,7 +157,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "8388608",
"total_send_deducted_tokens": "8388608"
"total_send_deducted_tokens": "8388608",
"disconnected": false
},
{
"stream": {
Expand Down Expand Up @@ -193,7 +198,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "8388608",
"total_send_deducted_tokens": "8388608"
"total_send_deducted_tokens": "8388608",
"disconnected": false
},
{
"stream": {
Expand Down Expand Up @@ -233,7 +239,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "8388608",
"total_send_deducted_tokens": "8388608"
"total_send_deducted_tokens": "8388608",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -292,7 +299,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "8388608",
"total_send_deducted_tokens": "8388608"
"total_send_deducted_tokens": "8388608",
"disconnected": false
},
{
"stream": {
Expand Down Expand Up @@ -332,7 +340,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "8388608",
"total_send_deducted_tokens": "8388608"
"total_send_deducted_tokens": "8388608",
"disconnected": false
}
]
}
Expand Down Expand Up @@ -386,7 +395,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "5242880",
"total_send_deducted_tokens": "5242880"
"total_send_deducted_tokens": "5242880",
"disconnected": false
},
{
"stream": {
Expand All @@ -410,7 +420,8 @@ inspect range_id=1
}
],
"total_eval_deducted_tokens": "2097152",
"total_send_deducted_tokens": "2097152"
"total_send_deducted_tokens": "2097152",
"disconnected": false
}
]
}
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3705,6 +3705,7 @@ func (s *Store) computeMetricsLocked(ctx context.Context) (m storage.Metrics, er
func (s *Store) ComputeMetricsPeriodically(
ctx context.Context, prevMetrics *storage.MetricsForInterval, tick int,
) (m storage.Metrics, err error) {
ctx = s.AnnotateCtx(ctx)
m, err = s.computeMetrics(ctx)
if err != nil {
return m, err
Expand Down Expand Up @@ -3779,9 +3780,7 @@ func (s *Store) ComputeMetricsPeriodically(
// non-periodic callers of this method don't trigger expensive
// stats.
if tick%logSSTInfoTicks == 1 /* every 10m */ {
// NB: The initial blank line ensures that compaction stats display
// will not contain the log prefix.
log.Storage.Infof(ctx, "\n%s", m.Metrics)
log.Storage.Infof(ctx, "Pebble metrics:\n%s", m.Metrics)
}
// Periodically emit a store stats structured event to the TELEMETRY channel,
// if reporting is enabled. These events are intended to be emitted at low
Expand Down
43 changes: 33 additions & 10 deletions pkg/server/env_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/goroutinedumper"
"github.com/cockroachdb/cockroach/pkg/server/profiler"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -23,6 +24,22 @@ import (
"github.com/cockroachdb/errors"
)

var jemallocPurgeOverhead = settings.RegisterIntSetting(
settings.SystemVisible,
"server.jemalloc_purge_overhead_percent",
"a purge of jemalloc dirty pages is issued once the overhead exceeds this percent (0 disables purging)",
20,
settings.NonNegativeInt,
)

var jemallocPurgePeriod = settings.RegisterDurationSettingWithExplicitUnit(
settings.SystemVisible,
"server.jemalloc_purge_period",
"minimum amount of time that must pass between two jemalloc dirty page purges (0 disables purging)",
2*time.Minute,
settings.NonNegativeDuration,
)

type sampleEnvironmentCfg struct {
st *cluster.Settings
stopper *stop.Stopper
Expand All @@ -33,36 +50,37 @@ type sampleEnvironmentCfg struct {
runtime *status.RuntimeStatSampler
sessionRegistry *sql.SessionRegistry
rootMemMonitor *mon.BytesMonitor
cgoMemTarget uint64
}

// startSampleEnvironment starts a periodic loop that samples the environment and,
// when appropriate, creates goroutine and/or heap dumps.
//
// The pebbleCacheSize is used to determine a target for CGO memory allocation.
func startSampleEnvironment(
ctx context.Context,
settings *cluster.Settings,
srvCfg *BaseConfig,
pebbleCacheSize int64,
stopper *stop.Stopper,
goroutineDumpDirName string,
heapProfileDirName string,
cpuProfileDirName string,
runtimeSampler *status.RuntimeStatSampler,
sessionRegistry *sql.SessionRegistry,
rootMemMonitor *mon.BytesMonitor,
testingKnobs base.TestingKnobs,
) error {
metricsSampleInterval := base.DefaultMetricsSampleInterval
if p, ok := testingKnobs.Server.(*TestingKnobs); ok && p.EnvironmentSampleInterval != time.Duration(0) {
if p, ok := srvCfg.TestingKnobs.Server.(*TestingKnobs); ok && p.EnvironmentSampleInterval != time.Duration(0) {
metricsSampleInterval = p.EnvironmentSampleInterval
}
cfg := sampleEnvironmentCfg{
st: settings,
st: srvCfg.Settings,
stopper: stopper,
minSampleInterval: metricsSampleInterval,
goroutineDumpDirName: goroutineDumpDirName,
heapProfileDirName: heapProfileDirName,
cpuProfileDirName: cpuProfileDirName,
goroutineDumpDirName: srvCfg.GoroutineDumpDirName,
heapProfileDirName: srvCfg.HeapProfileDirName,
cpuProfileDirName: srvCfg.CPUProfileDirName,
runtime: runtimeSampler,
sessionRegistry: sessionRegistry,
rootMemMonitor: rootMemMonitor,
cgoMemTarget: max(uint64(pebbleCacheSize), 128*1024*1024),
}
// Immediately record summaries once on server startup.

Expand Down Expand Up @@ -156,6 +174,11 @@ func startSampleEnvironment(
cgoStats := status.GetCGoMemStats(ctx)
cfg.runtime.SampleEnvironment(ctx, cgoStats)

// Maybe purge jemalloc dirty pages.
if overhead, period := jemallocPurgeOverhead.Get(&cfg.st.SV), jemallocPurgePeriod.Get(&cfg.st.SV); overhead > 0 && period > 0 {
status.CGoMemMaybePurge(ctx, cgoStats.CGoAllocatedBytes, cgoStats.CGoTotalBytes, cfg.cgoMemTarget, int(overhead), period)
}

if goroutineDumper != nil {
goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value())
}
Expand Down
Loading

0 comments on commit 6e6f5fe

Please sign in to comment.