diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 7b0aefce0c94..93ca226c5552 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -421,6 +421,7 @@ go_test( "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", "//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb", "//pkg/kv/kvserver/kvflowcontrol/node_rac2", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvflowcontrol/replica_rac2", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index ff37488205d4..f07f1b02b5bd 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -2253,6 +2254,10 @@ func TestFlowControlBasicV2(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- Flow token metrics, before issuing the 1MiB replicated write.`) h.query(n1, v2FlowTokensQueryStr) @@ -2342,6 +2347,10 @@ func TestFlowControlRangeSplitMergeV2(t *testing.T) { require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.log("sending put request to pre-split range") h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) h.log("sent put request to pre-split range") @@ -2464,6 +2473,10 @@ func TestFlowControlBlockedAdmissionV2(t *testing.T) { require.NoError(t, err) h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 5 1MiB, 3x replicated write that's not admitted.)`) h.log("sending put requests") @@ -2579,6 +2592,10 @@ func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) { require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.log("sending put request to pre-split range") h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -2722,6 +2739,10 @@ func TestFlowControlCrashedNodeV2(t *testing.T) { require.NoError(t, err) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 5x1MiB, 2x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -2870,6 +2891,10 @@ func TestFlowControlRaftSnapshotV2(t *testing.T) { repl := store.LookupReplica(roachpb.RKey(k)) require.NotNil(t, repl) h.waitForConnectedStreams(ctx, repl.RangeID, 5, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) // Set up a key to replicate across the cluster. We're going to modify this // key and truncate the raft logs from that command after killing one of the @@ -3085,6 +3110,10 @@ func TestFlowControlRaftMembershipV2(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -3224,6 +3253,10 @@ func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) { // Make sure the lease is on n1 and that we're triply connected. tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -3353,6 +3386,10 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -3469,6 +3506,10 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) @@ -3571,6 +3612,10 @@ func TestFlowControlTransferLeaseV2(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -3664,6 +3709,10 @@ func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode)) @@ -3780,6 +3829,10 @@ func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) h.comment(`-- (Issuing 1024*1KiB, 3x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -4865,6 +4918,18 @@ func (h *flowControlTestHelper) enableVerboseRaftMsgLoggingForRange(rangeID roac } } +func (h *flowControlTestHelper) resetV2TokenMetrics(ctx context.Context) { + for _, server := range h.tc.Servers { + require.NoError(h.t, server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { + s.GetStoreConfig().KVFlowStreamTokenProvider.Metrics().(*rac2.TokenMetrics).TestingClear() + _, err := s.ComputeMetricsPeriodically(ctx, nil, 0) + require.NoError(h.t, err) + s.GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges() + return nil + })) + } +} + // makeV2EnabledTestFileName is a utility function which returns an updated // filename for the testdata file based on the v2EnabledWhenLeaderLevel. func makeV2EnabledTestFileName( diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go b/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go index eb515d41da18..de2108c567be 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go @@ -197,6 +197,34 @@ func NewTokenMetrics() *TokenMetrics { return m } +// TestingClear is used in tests to reset the metrics. +func (m *TokenMetrics) TestingClear() { + // NB: we only clear the counter metrics, as the stream metrics are gauges. + for _, typ := range []TokenType{ + EvalToken, + SendToken, + } { + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + m.CounterMetrics[typ].Deducted[wc].Clear() + m.CounterMetrics[typ].Returned[wc].Clear() + m.CounterMetrics[typ].Unaccounted[wc].Clear() + m.CounterMetrics[typ].Disconnected[wc].Clear() + if typ == SendToken { + m.CounterMetrics[typ].SendQueue[0].ForceFlushDeducted.Clear() + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + m.CounterMetrics[typ].SendQueue[0].PreventionDeducted[wc].Clear() + } + } + } + } +} + type TokenCounterMetrics struct { Deducted [admissionpb.NumWorkClasses]*metric.Counter Returned [admissionpb.NumWorkClasses]*metric.Counter