Skip to content

kvserver,changefeeds,crosscluster: set per-consumer catchup scan limit #133789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Knobs: ca.knobs.FeedKnobs,
ScopedTimers: ca.sliMetrics.Timers,
MonitoringCfg: monitoringCfg,
ConsumerID: int64(ca.spec.JobID),
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
Knobs TestingKnobs

ScopedTimers *timers.ScopedTimers

ConsumerID int64
}

// Run will run the kvfeed. The feed runs synchronously and returns an
Expand Down Expand Up @@ -123,6 +125,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
cfg.ConsumerID,
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
cfg.SchemaFeed,
Expand Down Expand Up @@ -248,6 +251,7 @@ type kvFeed struct {
withDiff bool
withFiltering bool
withInitialBackfill bool
consumerID int64
initialHighWater hlc.Timestamp
endTime hlc.Timestamp
writer kvevent.Writer
Expand Down Expand Up @@ -278,6 +282,7 @@ func newKVFeed(
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff, withFiltering bool,
consumerID int64,
initialHighWater hlc.Timestamp,
endTime hlc.Timestamp,
codec keys.SQLCodec,
Expand All @@ -297,6 +302,7 @@ func newKVFeed(
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
consumerID: consumerID,
initialHighWater: initialHighWater,
endTime: endTime,
schemaChangeEvents: schemaChangeEvents,
Expand Down Expand Up @@ -585,6 +591,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
ConsumerID: f.consumerID,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestKVFeed(t *testing.T) {
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
0, /* consumerID */
tc.initialHighWater, tc.endTime,
codec,
tf, sf, rangefeedFactory(ref.run), bufferFactory,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type rangeFeedConfig struct {
Spans []kvcoord.SpanTimePair
WithDiff bool
WithFiltering bool
ConsumerID int64
RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
Timers *timers.ScopedTimers
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
rangefeed.WithOnValues(s.onValues),
rangefeed.WithDiff(s.spec.WithDiff),
rangefeed.WithConsumerID(int64(s.streamID)),
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
rangefeed.WithFiltering(s.spec.WithFiltering),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error

for !s.transport.IsExhausted() {
args := makeRangeFeedRequest(
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs)
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID)
args.Replica = s.transport.NextReplica()
args.StreamID = streamID
s.ReplicaDescriptor = args.Replica
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type rangeFeedConfig struct {
withMetadata bool
withMatchingOriginIDs []uint32
rangeObserver RangeObserver
consumerID int64

knobs struct {
// onRangefeedEvent invoked on each rangefeed event.
Expand Down Expand Up @@ -138,6 +139,12 @@ func WithMetadata() RangeFeedOption {
})
}

func WithConsumerID(cid int64) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.consumerID = cid
})
}

// SpanTimePair is a pair of span along with its starting time. The starting
// time is exclusive, i.e. the first possible emitted event (including catchup
// scans) will be at startAfter.Next().
Expand Down Expand Up @@ -620,6 +627,7 @@ func makeRangeFeedRequest(
withDiff bool,
withFiltering bool,
withMatchingOriginIDs []uint32,
consumerID int64,
) kvpb.RangeFeedRequest {
admissionPri := admissionpb.BulkNormalPri
if isSystemRange {
Expand All @@ -631,6 +639,7 @@ func makeRangeFeedRequest(
Timestamp: startAfter,
RangeID: rangeID,
},
ConsumerID: consumerID,
WithDiff: withDiff,
WithFiltering: withFiltering,
WithMatchingOriginIDs: withMatchingOriginIDs,
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type config struct {
withDiff bool
withFiltering bool
withMatchingOriginIDs []uint32
consumerID int64
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
frontierQuantize time.Duration
Expand Down Expand Up @@ -159,6 +160,12 @@ func WithOriginIDsMatching(originIDs ...uint32) Option {
})
}

func WithConsumerID(cid int64) Option {
return optionFunc(func(c *config) {
c.consumerID = cid
})
}

// WithInvoker makes an option to invoke the rangefeed tasks such as running the
// the client and processing events emitted by the client with a caller-supplied
// function, which can make it easier to introspect into work done by a given
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
if f.onMetadata != nil {
rangefeedOpts = append(rangefeedOpts, kvcoord.WithMetadata())
}
rangefeedOpts = append(rangefeedOpts, kvcoord.WithConsumerID(f.consumerID))

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
Expand Down
98 changes: 97 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
_, rErr := s3.RangeFeed(sink.ctx, &req, sink)
_, rErr := s3.RangeFeed(sink.ctx, &req, sink, nil)
require.NoError(t, rErr) // check if we've errored yet
require.NoError(t, sink.Error())
t.Logf("started rangefeed on %s", repl3)
Expand Down Expand Up @@ -1877,3 +1877,99 @@ func TestRangeFeedMetadataAutoSplit(t *testing.T) {
}
})
}

// TestRangefeedCatchupStarvation tests that a single MuxRangefeed
// call cannot starve other users. Note that starvation is still
// possible if there are more than 2 consumers of a given range.
func TestRangefeedCatchupStarvation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunValues(t, "feed_type", feedTypes, func(t *testing.T, rt rangefeedTestType) {
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvserver.RangefeedUseBufferedSender.Override(ctx, &settings.SV, rt.useBufferedSender)
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
// Lower the limit to make it more likely to get starved.
kvserver.ConcurrentRangefeedItersLimit.Override(ctx, &settings.SV, 8)
kvserver.PerConsumerCatchupLimit.Override(ctx, &settings.SV, 6)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()
ts := s.Clock().Now()
scratchKey := append(s.Codec().TenantPrefix(), keys.ScratchRangeMin...)
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
mkKey := func(k string) roachpb.Key {
return encoding.EncodeStringAscending(scratchKey, k)
}
ranges := 32
keysPerRange := 128
totalKeys := ranges * keysPerRange
for i := range ranges {
for j := range keysPerRange {
k := mkKey(fmt.Sprintf("%d-%d", i, j))
require.NoError(t, db.Put(ctx, k, 1))
}
_, _, err := srv.SplitRange(mkKey(fmt.Sprintf("%d", i)))
require.NoError(t, err)
}

span := roachpb.Span{Key: scratchKey, EndKey: scratchKey.PrefixEnd()}
f, err := rangefeed.NewFactory(s.AppStopper(), db, s.ClusterSettings(), nil)
require.NoError(t, err)

blocked := make(chan struct{})
r1, err := f.RangeFeed(ctx, "consumer-1-rf-1", []roachpb.Span{span}, ts,
func(ctx context.Context, value *kvpb.RangeFeedValue) {
blocked <- struct{}{}
<-ctx.Done()
},
rangefeed.WithConsumerID(1),
)
require.NoError(t, err)
defer r1.Close()
<-blocked

// Multiple rangefeeds from the same ConsumeID should
// be treated as the same consumer and thus they
// shouldn't be able to overwhelm the overall store
// quota.
for i := range 8 {
r1, err := f.RangeFeed(ctx, fmt.Sprintf("consumer-1-rf-%d", i+2), []roachpb.Span{span}, ts,
func(ctx context.Context, value *kvpb.RangeFeedValue) { <-ctx.Done() },
rangefeed.WithConsumerID(1),
)
require.NoError(t, err)
defer r1.Close()
}

// Despite 9 rangefeeds above each needing 32 catchup
// scans, the following rangefeed should always make
// progress because it has a different consumer ID.
r2ConsumedRow := make(chan roachpb.Key)
r2, err := f.RangeFeed(ctx, "rf2", []roachpb.Span{span}, ts,
func(ctx context.Context, value *kvpb.RangeFeedValue) {
r2ConsumedRow <- value.Key
},
rangefeed.WithConsumerID(2),
)
require.NoError(t, err)
defer r2.Close()

// Wait until we see every key we've writen on rf2.
seen := make(map[string]struct{}, 0)
for {
select {
case r := <-r2ConsumedRow:
seen[r.String()] = struct{}{}
if len(seen) >= totalKeys {
return
}
case <-time.After(testutils.DefaultSucceedsSoonDuration):
t.Fatal("test timed out")
}
}
})
}
5 changes: 4 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3293,7 +3293,10 @@ message RangeFeedRequest {
// field is empty, all events are emitted.
repeated uint32 with_matching_origin_ids = 8 [(gogoproto.customname) = "WithMatchingOriginIDs"];

// NextID = 9;
// ConsumerID is set by the caller to identify itself.
int64 consumer_id = 9 [(gogoproto.customname) = "ConsumerID"];

// NextID = 10;
}

// RangeFeedValue is a variant of RangeFeedEvent that represents an update to
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func waitReplicaRangeFeed(
return stream.SendUnbuffered(&event)
}

_, err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */)
_, err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */, nil /* perConsumerCatchupLimiter */)
if err != nil {
return sendErrToStream(kvpb.NewError(err))
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -241,6 +242,7 @@ func (r *Replica) RangeFeed(
args *kvpb.RangeFeedRequest,
stream rangefeed.Stream,
pacer *admission.Pacer,
perConsumerCatchupLimiter *limit.ConcurrentRequestLimiter,
) (rangefeed.Disconnector, error) {
streamCtx = r.AnnotateCtx(streamCtx)

Expand Down Expand Up @@ -281,8 +283,18 @@ func (r *Replica) RangeFeed(
iterSemRelease := func() {}
if !args.Timestamp.IsEmpty() {
usingCatchUpIter = true
perConsumerRelease := func() {}
if perConsumerCatchupLimiter != nil {
perConsumerAlloc, err := perConsumerCatchupLimiter.Begin(streamCtx)
if err != nil {
return nil, err
}
perConsumerRelease = perConsumerAlloc.Release
}

alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(streamCtx)
if err != nil {
perConsumerRelease()
return nil, err
}

Expand All @@ -296,7 +308,11 @@ func (r *Replica) RangeFeed(
// scan.
var iterSemReleaseOnce sync.Once
iterSemRelease = func() {
iterSemReleaseOnce.Do(alloc.Release)
iterSemReleaseOnce.Do(func() {
alloc.Release()
perConsumerRelease()
})

}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *testStream) WaitForError(t *testing.T) error {
func waitRangeFeed(
t *testing.T, store *kvserver.Store, req *kvpb.RangeFeedRequest, stream *testStream,
) error {
if _, err := store.RangeFeed(stream.ctx, req, stream); err != nil {
if _, err := store.RangeFeed(stream.ctx, req, stream, nil /* perConsumerCatchupLimiter */); err != nil {
return err
}
return stream.WaitForError(t)
Expand Down
Loading
Loading