From 7b2f2584f0ebeeb7b99c34d8e6a990b1682f94fa Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 4 Dec 2023 14:38:52 +0800 Subject: [PATCH] cdc: fixes minor bugs #10168 and #10169 (#10170) (#10191) close pingcap/tiflow#10168 --- cdc/kv/client.go | 4 ++++ cdc/kv/shared_client.go | 6 ++++-- cdc/processor/sinkmanager/tasks.go | 4 ++-- cdc/processor/sinkmanager/tasks_test.go | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index ea30d0f2ee2..b798a50d30b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -73,6 +73,8 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region anymore. regionScheduleReload = false + + scanRegionsConcurrency = 1024 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -431,6 +433,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) for { select { case <-ctx.Done(): diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 69dfb2c4aa6..afe0ff83395 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -244,7 +244,7 @@ func (s *SharedClient) Run(ctx context.Context) error { s.workers = append(s.workers, worker) } - g.Go(func() error { return s.handleRequestRanges(ctx, g) }) + g.Go(func() error { return s.handleRequestRanges(ctx) }) g.Go(func() error { return s.dispatchRequest(ctx) }) g.Go(func() error { return s.requestRegionToStore(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) @@ -427,7 +427,9 @@ func (s *SharedClient) broadcastRequest(r *requestedStore, sri singleRegionInfo) } } -func (s *SharedClient) handleRequestRanges(ctx context.Context, g *errgroup.Group) error { +func (s *SharedClient) handleRequestRanges(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) for { select { case <-ctx.Done(): diff --git a/cdc/processor/sinkmanager/tasks.go b/cdc/processor/sinkmanager/tasks.go index e4b94551191..dd415428c83 100644 --- a/cdc/processor/sinkmanager/tasks.go +++ b/cdc/processor/sinkmanager/tasks.go @@ -39,8 +39,8 @@ var ( maxUpdateIntervalSize = defaultMaxUpdateIntervalSize // Sink manager schedules table tasks based on lag. Limit the max task range - // can be helpful to reduce changefeed latency. - maxTaskTimeRange = 5 * time.Second + // can be helpful to reduce changefeed latency for large initial data. + maxTaskTimeRange = 30 * time.Minute ) // Used to record the progress of the table. diff --git a/cdc/processor/sinkmanager/tasks_test.go b/cdc/processor/sinkmanager/tasks_test.go index fac3788a07f..15a1f559259 100644 --- a/cdc/processor/sinkmanager/tasks_test.go +++ b/cdc/processor/sinkmanager/tasks_test.go @@ -37,7 +37,7 @@ func TestValidateAndAdjustBound(t *testing.T) { StartTs: 439333515018895365, CommitTs: 439333515018895366, }, - taskTimeRange: 10 * time.Second, + taskTimeRange: 60 * time.Minute, expectAdjust: true, }, {