From d2bd195dfc7f78d3551e7628f49e066d68d7e984 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 5 Sep 2023 16:30:42 +0800 Subject: [PATCH] scheduller(ticc): fix uneven write keys distribution (#9664) (#9672) close pingcap/tiflow#9665 --- cdc/scheduler/internal/v3/keyspan/mock.go | 2 +- .../internal/v3/keyspan/reconciler.go | 36 ++- .../internal/v3/keyspan/reconciler_test.go | 17 ++ .../v3/keyspan/splitter_region_count.go | 40 ++- .../v3/keyspan/splitter_region_count_test.go | 66 +++-- .../internal/v3/keyspan/splitter_write.go | 242 ++++++++++------- .../v3/keyspan/splitter_write_test.go | 248 +++++++++++++----- 7 files changed, 428 insertions(+), 223 deletions(-) diff --git a/cdc/scheduler/internal/v3/keyspan/mock.go b/cdc/scheduler/internal/v3/keyspan/mock.go index a767508d17f..ffc5e58adf4 100644 --- a/cdc/scheduler/internal/v3/keyspan/mock.go +++ b/cdc/scheduler/internal/v3/keyspan/mock.go @@ -81,6 +81,6 @@ func NewReconcilerForTests( return &Reconciler{ tableSpans: make(map[int64]splittedSpans), config: config, - splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache)}, + splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache, config.RegionPerSpan)}, } } diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler.go b/cdc/scheduler/internal/v3/keyspan/reconciler.go index 3dc115bef23..021cc28aa64 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler.go @@ -30,12 +30,17 @@ import ( "go.uber.org/zap" ) -const spanRegionLimit = 50000 +const ( + // spanRegionLimit is the maximum number of regions a span can cover. + spanRegionLimit = 50000 + // baseSpanNumberCoefficient is the base coefficient that use to + // multiply the number of captures to get the number of spans. + baseSpanNumberCoefficient = 3 +) type splitter interface { split( ctx context.Context, span tablepb.Span, totalCaptures int, - config *config.ChangefeedSchedulerConfig, ) []tablepb.Span } @@ -72,8 +77,8 @@ func NewReconciler( config: config, splitter: []splitter{ // write splitter has the highest priority. - newWriteSplitter(changefeedID, pdapi), - newRegionCountSplitter(changefeedID, up.RegionCache), + newWriteSplitter(changefeedID, pdapi, config.WriteKeyThreshold), + newRegionCountSplitter(changefeedID, up.RegionCache, config.RegionThreshold), }, }, nil } @@ -121,7 +126,7 @@ func (m *Reconciler) Reconcile( spans := []tablepb.Span{tableSpan} if compat.CheckSpanReplicationEnabled() { for _, splitter := range m.splitter { - spans = splitter.split(ctx, tableSpan, len(aliveCaptures), m.config) + spans = splitter.split(ctx, tableSpan, len(aliveCaptures)) if len(spans) > 1 { break } @@ -208,3 +213,24 @@ func (m *Reconciler) Reconcile( } return m.spanCache } + +const maxSpanNumber = 100 + +func getSpansNumber(regionNum, captureNum int) int { + coefficient := captureNum - 1 + if baseSpanNumberCoefficient > coefficient { + coefficient = baseSpanNumberCoefficient + } + spanNum := 1 + if regionNum > 1 { + // spanNumber = max(captureNum * coefficient, totalRegions / spanRegionLimit) + spanNum = captureNum * coefficient + if regionNum/spanRegionLimit > spanNum { + spanNum = regionNum / spanRegionLimit + } + } + if spanNum > maxSpanNumber { + spanNum = maxSpanNumber + } + return spanNum +} diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler_test.go b/cdc/scheduler/internal/v3/keyspan/reconciler_test.go index 9da2aeef7b0..0552303dce7 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler_test.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler_test.go @@ -259,3 +259,20 @@ func TestBatchAddRateLimit(t *testing.T) { require.Equal(t, allSpan, reconciler.tableSpans[2].spans) require.Equal(t, 1, len(reconciler.tableSpans)) } + +func TestGetSpansNumber(t *testing.T) { + tc := []struct { + regionCount int + captureNum int + expected int + }{ + {1, 10, 1}, + {100, 2, 6}, + {100, 3, 9}, + {100, 5, 20}, + {10000, 11, 100}, + } + for _, c := range tc { + require.Equal(t, c.expected, getSpansNumber(c.regionCount, c.captureNum)) + } +} diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go index 74cd2ab760b..bc00ffa3d79 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go @@ -21,28 +21,28 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/config" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) type regionCountSplitter struct { - changefeedID model.ChangeFeedID - regionCache RegionCache + changefeedID model.ChangeFeedID + regionCache RegionCache + regionThreshold int } func newRegionCountSplitter( - changefeedID model.ChangeFeedID, regionCache RegionCache, + changefeedID model.ChangeFeedID, regionCache RegionCache, regionThreshold int, ) *regionCountSplitter { return ®ionCountSplitter{ - changefeedID: changefeedID, - regionCache: regionCache, + changefeedID: changefeedID, + regionCache: regionCache, + regionThreshold: regionThreshold, } } func (m *regionCountSplitter) split( - ctx context.Context, span tablepb.Span, totalCaptures int, - config *config.ChangefeedSchedulerConfig, + ctx context.Context, span tablepb.Span, captureNum int, ) []tablepb.Span { bo := tikv.NewBackoffer(ctx, 500) regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey) @@ -54,29 +54,21 @@ func (m *regionCountSplitter) split( zap.Error(err)) return []tablepb.Span{span} } - if len(regions) <= config.RegionThreshold || totalCaptures == 0 { + if len(regions) <= m.regionThreshold || captureNum == 0 { log.Info("schedulerv3: skip split span by region count", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.String("span", span.String()), - zap.Int("totalCaptures", totalCaptures), + zap.Int("totalCaptures", captureNum), zap.Int("regionCount", len(regions)), - zap.Int("regionThreshold", config.RegionThreshold)) + zap.Int("regionThreshold", m.regionThreshold)) return []tablepb.Span{span} } - pages := totalCaptures + stepper := newEvenlySplitStepper( + getSpansNumber(len(regions), captureNum), + len(regions)) - totalRegions := len(regions) - if totalRegions == 0 { - pages = 1 - } - - if totalRegions/spanRegionLimit > pages { - pages = totalRegions / spanRegionLimit - } - - stepper := newEvenlySplitStepper(pages, totalRegions) spans := make([]tablepb.Span, 0, stepper.SpanCount()) start, end := 0, stepper.Step() for { @@ -133,9 +125,9 @@ func (m *regionCountSplitter) split( zap.String("changefeed", m.changefeedID.ID), zap.String("span", span.String()), zap.Int("spans", len(spans)), - zap.Int("totalCaptures", totalCaptures), + zap.Int("totalCaptures", captureNum), zap.Int("regionCount", len(regions)), - zap.Int("regionThreshold", config.RegionThreshold), + zap.Int("regionThreshold", m.regionThreshold), zap.Int("spanRegionLimit", spanRegionLimit)) return spans } diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go index 5063f1504bb..a42041dec58 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go @@ -77,7 +77,8 @@ func TestRegionCountSplitSpan(t *testing.T) { totalCaptures: 4, span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region @@ -87,8 +88,10 @@ func TestRegionCountSplitSpan(t *testing.T) { totalCaptures: 3, span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region }, }, @@ -96,26 +99,31 @@ func TestRegionCountSplitSpan(t *testing.T) { totalCaptures: 2, span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_3")}, // 3 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t2")}, // 2 region + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region }, }, { totalCaptures: 1, span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, // 5 region + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region }, }, } for i, cs := range cases { - splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) cfg := &config.ChangefeedSchedulerConfig{ EnableTableAcrossNodes: true, RegionThreshold: 1, } - spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, cfg) + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold) + spans := splitter.split(context.Background(), cs.span, cs.totalCaptures) require.Equalf(t, cs.expectSpans, spans, "%d %s", i, &cs.span) } } @@ -134,64 +142,66 @@ func TestRegionCountEvenlySplitSpan(t *testing.T) { cases := []struct { totalCaptures int + expectedSpans int expectSpansMin int expectSpansMax int }{ { totalCaptures: 0, + expectedSpans: 1, expectSpansMin: 1000, expectSpansMax: 1000, }, { totalCaptures: 1, - expectSpansMin: 1000, - expectSpansMax: 1000, + expectedSpans: 3, + expectSpansMin: 333, + expectSpansMax: 334, }, { totalCaptures: 3, - expectSpansMin: 333, - expectSpansMax: 334, + expectedSpans: 9, + expectSpansMin: 111, + expectSpansMax: 113, }, { totalCaptures: 7, - expectSpansMin: 142, - expectSpansMax: 143, + expectedSpans: 42, + expectSpansMin: 23, + expectSpansMax: 24, }, { totalCaptures: 999, + expectedSpans: 100, expectSpansMin: 1, - expectSpansMax: 2, + expectSpansMax: 10, }, { totalCaptures: 1000, + expectedSpans: 100, expectSpansMin: 1, - expectSpansMax: 1, + expectSpansMax: 10, }, { totalCaptures: 2000, + expectedSpans: 100, expectSpansMin: 1, - expectSpansMax: 1, + expectSpansMax: 10, }, } for i, cs := range cases { - splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) cfg := &config.ChangefeedSchedulerConfig{ EnableTableAcrossNodes: true, RegionThreshold: 1, } + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold) spans := splitter.split( context.Background(), tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, cs.totalCaptures, - cfg, ) - if cs.totalCaptures == 0 { - require.Equalf(t, 1, len(spans), "%d %v", i, cs) - } else if cs.totalCaptures <= 1000 { - require.Equalf(t, cs.totalCaptures, len(spans), "%d %v", i, cs) - } else { - require.Equalf(t, 1000, len(spans), "%d %v", i, cs) - } + + require.Equalf(t, cs.expectedSpans, len(spans), "%d %v", i, cs) for _, span := range spans { start, end := 0, 1000 @@ -217,13 +227,13 @@ func TestSplitSpanRegionOutOfOrder(t *testing.T) { cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_4")}, 2) cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3) - splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) cfg := &config.ChangefeedSchedulerConfig{ EnableTableAcrossNodes: true, RegionThreshold: 1, } + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold) span := tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")} - spans := splitter.split(context.Background(), span, 1, cfg) + spans := splitter.split(context.Background(), span, 1) require.Equal( t, []tablepb.Span{{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans) } diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write.go b/cdc/scheduler/internal/v3/keyspan/splitter_write.go index fc458ef788c..9773597b582 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdutil" "go.uber.org/zap" ) @@ -28,24 +27,36 @@ import ( const regionWrittenKeyBase = 1 type writeSplitter struct { - changefeedID model.ChangeFeedID - pdAPIClient pdutil.PDAPIClient + changefeedID model.ChangeFeedID + pdAPIClient pdutil.PDAPIClient + writeKeyThreshold int +} + +type splitRegionsInfo struct { + RegionCounts []int + Weights []uint64 + WriteKeys []uint64 + Spans []tablepb.Span } func newWriteSplitter( - changefeedID model.ChangeFeedID, pdAPIClient pdutil.PDAPIClient, + changefeedID model.ChangeFeedID, + pdAPIClient pdutil.PDAPIClient, + writeKeyThreshold int, ) *writeSplitter { return &writeSplitter{ - changefeedID: changefeedID, - pdAPIClient: pdAPIClient, + changefeedID: changefeedID, + pdAPIClient: pdAPIClient, + writeKeyThreshold: writeKeyThreshold, } } func (m *writeSplitter) split( - ctx context.Context, span tablepb.Span, totalCaptures int, - config *config.ChangefeedSchedulerConfig, + ctx context.Context, + span tablepb.Span, + captureNum int, ) []tablepb.Span { - if config.WriteKeyThreshold == 0 { + if m.writeKeyThreshold == 0 { return nil } regions, err := m.pdAPIClient.ScanRegions(ctx, span) @@ -59,12 +70,8 @@ func (m *writeSplitter) split( return nil } - pages := totalCaptures - if len(regions)/spanRegionLimit > pages { - pages = len(regions) / spanRegionLimit - } - - if pages <= 1 { + spansNum := getSpansNumber(len(regions), captureNum) + if spansNum <= 1 { log.Warn("schedulerv3: only one capture and the regions number less than"+ " the maxSpanRegionLimit, skip split span", zap.String("namespace", m.changefeedID.Namespace), @@ -74,54 +81,60 @@ func (m *writeSplitter) split( return []tablepb.Span{span} } - info := splitRegionsByWrittenKeys(span.TableID, - regions, - config.WriteKeyThreshold, - pages, - spanRegionLimit) - + splitInfo := m.splitRegionsByWrittenKeysV1(span.TableID, regions, spansNum) log.Info("schedulerv3: split span by written keys", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.String("span", span.String()), - zap.Ints("counts", info.Counts), - zap.Ints("weights", info.Weights), - zap.Int("spans", len(info.Spans)), - zap.Int("totalCaptures", totalCaptures), - zap.Int("writeKeyThreshold", config.WriteKeyThreshold), + zap.Ints("perSpanRegionCounts", splitInfo.RegionCounts), + zap.Uint64s("weights", splitInfo.Weights), + zap.Int("spans", len(splitInfo.Spans)), + zap.Int("totalCaptures", captureNum), + zap.Int("writeKeyThreshold", m.writeKeyThreshold), zap.Int("spanRegionLimit", spanRegionLimit)) - return info.Spans -} -type splitRegionsInfo struct { - Counts []int - Weights []int - Spans []tablepb.Span + return splitInfo.Spans } -// splitRegionsByWrittenKeys returns a slice of regions that evenly split the range by write keys. -// pages is the number of splits to make, actually it is the number of captures. -func splitRegionsByWrittenKeys( - tableID model.TableID, regions []pdutil.RegionInfo, - writeKeyThreshold int, pages int, spanRegionLimit int, +// splitRegionsByWrittenKeysV1 tries to split the regions into at least `baseSpansNum` spans, +// each span has approximately the same write weight. +// The algorithm is: +// 1. Sum the written keys of all regions, and normalize the written keys of each region by +// adding baseline weights (regionWrittenKeyBase) to each region's written keys. Which takes +// the region number into account. +// 2. Calculate the writeLimitPerSpan. +// 3. Split the table into spans: +// 3.1 If the total write is less than writeKeyThreshold, don't need to split the regions. +// 3.2 If the restSpans count is one, and the restWeight is less than writeLimitPerSpan, +// we will use the rest regions as the last span. If the restWeight is larger than writeLimitPerSpan, +// then we need to add more restSpans (restWeight / writeLimitPerSpan) to split the rest regions. +// 3.3 If the restRegions is less than equal to restSpans, then every region will be a span. +// 3.4 If the spanWriteWeight is larger than writeLimitPerSpan or the regionCount is larger +// than spanRegionLimit, then use the region range from spanStartIndex to i to as a span. +// 4. Return the split result. +func (m *writeSplitter) splitRegionsByWrittenKeysV1( + tableID model.TableID, + regions []pdutil.RegionInfo, + baseSpansNum int, ) *splitRegionsInfo { decodeKey := func(hexkey string) []byte { key, _ := hex.DecodeString(hexkey) return key } - totalWriteNormalized := uint64(0) - totalWrite := totalWriteNormalized + + totalWrite, totalWriteNormalized := uint64(0), uint64(0) for i := range regions { totalWrite += regions[i].WrittenKeys - // Override 0 to 1 to reflect the baseline cost of a region. - // Also, it makes split evenly when there is no write. regions[i].WrittenKeys += regionWrittenKeyBase totalWriteNormalized += regions[i].WrittenKeys } - if totalWrite < uint64(writeKeyThreshold) { + + // 1. If the total write is less than writeKeyThreshold + // don't need to split the regions + if totalWrite < uint64(m.writeKeyThreshold) { return &splitRegionsInfo{ - Counts: []int{len(regions)}, - Weights: []int{int(totalWriteNormalized)}, + RegionCounts: []int{len(regions)}, + Weights: []uint64{totalWriteNormalized}, Spans: []tablepb.Span{{ TableID: tableID, StartKey: tablepb.Key(decodeKey(regions[0].StartKey)), @@ -130,65 +143,104 @@ func splitRegionsByWrittenKeys( } } - writtenKeysPerPage := totalWriteNormalized / uint64(pages) - counts := make([]int, 0, pages) - weights := make([]int, 0, pages) - spans := make([]tablepb.Span, 0, pages) - accWrittenKeys, pageWrittenKeys := uint64(0), uint64(0) - pageStartIdx, pageLastIdx := 0, 0 - pageRegionsCount := 0 - // split the table into pages-1 spans, each span has writtenKeysPerPage written keys. - for i := 1; i < pages; i++ { - for idx := pageStartIdx; idx < len(regions); idx++ { - restPages := pages - i - restRegions := len(regions) - idx - pageLastIdx = idx - currentWrittenKeys := regions[idx].WrittenKeys - // If there is at least one region, and the rest regions can't fill the rest pages or - // the accWrittenKeys plus currentWrittenKeys is larger than writtenKeysPerPage, - // then use the region from pageStartIdx to idx-1 to as a span and start a new page. - if (idx > pageStartIdx) && - ((restPages >= restRegions) || - (accWrittenKeys+currentWrittenKeys > writtenKeysPerPage) || - pageRegionsCount >= spanRegionLimit) { + // 2. Calculate the writeLimitPerSpan, if one span's write is larger that + // this number, we should create a new span. + writeLimitPerSpan := totalWriteNormalized / uint64(baseSpansNum) + + // The result of this method + var ( + regionCounts = make([]int, 0, baseSpansNum) + writeKeys = make([]uint64, 0, baseSpansNum) + weights = make([]uint64, 0, baseSpansNum) + spans = make([]tablepb.Span, 0, baseSpansNum) + ) + + // Temp variables used in the loop + var ( + spanWriteWeight = uint64(0) + spanStartIndex = 0 + restSpans = baseSpansNum + regionCount = 0 + restWeight = int64(totalWriteNormalized) + ) + + // 3. Split the table into spans, each span has approximately + // `writeWeightPerSpan` weight or `spanRegionLimit` regions. + for i := 0; i < len(regions); i++ { + restRegions := len(regions) - i + regionCount++ + spanWriteWeight += regions[i].WrittenKeys + // If the restSpans count is one, and the restWeight is less than writeLimitPerSpan, + // we will use the rest regions as the last span. If the restWeight is larger than writeLimitPerSpan, + // then we need to add more restSpans (restWeight / writeLimitPerSpan) to split the rest regions. + if restSpans == 1 { + if restWeight < int64(writeLimitPerSpan) { spans = append(spans, tablepb.Span{ TableID: tableID, - StartKey: tablepb.Key(decodeKey(regions[pageStartIdx].StartKey)), - EndKey: tablepb.Key(decodeKey(regions[idx-1].EndKey)), + StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)), }) - counts = append(counts, idx-pageStartIdx) - weights = append(weights, int(pageWrittenKeys)) - pageWrittenKeys = 0 - pageStartIdx = idx - // update writtenKeysPerPage to make the rest regions evenly split - // to the rest pages. - writtenKeysPerPage = (totalWriteNormalized - accWrittenKeys) / uint64(restPages) - accWrittenKeys = 0 - pageRegionsCount = 0 + + lastSpanRegionCount := len(regions) - spanStartIndex + lastSpanWriteWeight := uint64(0) + lastSpanWriteKey := uint64(0) + for j := spanStartIndex; j < len(regions); j++ { + lastSpanWriteKey += regions[j].WrittenKeys + lastSpanWriteWeight += regions[j].WrittenKeys + } + regionCounts = append(regionCounts, lastSpanRegionCount) + weights = append(weights, lastSpanWriteWeight) + writeKeys = append(writeKeys, lastSpanWriteKey) break } - pageWrittenKeys += currentWrittenKeys - accWrittenKeys += currentWrittenKeys - pageRegionsCount++ + // If the restWeight is larger than writeLimitPerSpan, + // then we need to update the restSpans. + restSpans = int(restWeight) / int(writeLimitPerSpan) } - } - // The last span contains the rest regions. - spans = append(spans, tablepb.Span{ - TableID: tableID, - StartKey: tablepb.Key(decodeKey(regions[pageLastIdx].StartKey)), - EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)), - }) - counts = append(counts, len(regions)-pageLastIdx) - pageWrittenKeys = 0 - for idx := pageLastIdx; idx < len(regions); idx++ { - pageWrittenKeys += regions[idx].WrittenKeys - } - weights = append(weights, int(pageWrittenKeys)) + // If the restRegions is less than equal to restSpans, + // then every region will be a span. + if restRegions <= restSpans { + spans = append(spans, tablepb.Span{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[i].EndKey)), + }) + regionCounts = append(regionCounts, regionCount) + weights = append(weights, spanWriteWeight) + + // reset the temp variables to start a new span + restSpans-- + restWeight -= int64(spanWriteWeight) + spanWriteWeight = 0 + regionCount = 0 + spanStartIndex = i + 1 + continue + } + // If the spanWriteWeight is larger than writeLimitPerSpan or the regionCount + // is larger than spanRegionLimit, then use the region range from + // spanStartIndex to i to as a span. + if spanWriteWeight > writeLimitPerSpan || regionCount >= spanRegionLimit { + spans = append(spans, tablepb.Span{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[i].EndKey)), + }) + regionCounts = append(regionCounts, regionCount) + weights = append(weights, spanWriteWeight) + // reset the temp variables to start a new span + restSpans-- + restWeight -= int64(spanWriteWeight) + spanWriteWeight = 0 + regionCount = 0 + spanStartIndex = i + 1 + } + } return &splitRegionsInfo{ - Counts: counts, - Weights: weights, - Spans: spans, + RegionCounts: regionCounts, + Weights: weights, + WriteKeys: writeKeys, + Spans: spans, } } diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go index 56590644435..2987857c776 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go @@ -17,10 +17,12 @@ import ( "context" "encoding/hex" "math" + "math/rand" + "strconv" "testing" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/stretchr/testify/require" ) @@ -57,50 +59,50 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { re := require.New(t) regions, startKeys, endKeys := prepareRegionsInfo( - [7]int{100, 100, 100, 100, 100, 100, 100}) - - info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1, spanRegionLimit) - re.Len(info.Counts, 1) - re.EqualValues(7, info.Counts[0]) + [7]int{100, 100, 100, 100, 100, 100, 100}) // region id: [2,3,4,5,6,7,8] + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 0) + info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), 1) + re.Len(info.RegionCounts, 1) + re.EqualValues(7, info.RegionCounts[0]) re.Len(info.Spans, 1) re.EqualValues(startKeys[2], info.Spans[0].StartKey) re.EqualValues(endKeys[8], info.Spans[0].EndKey) - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, spanRegionLimit) // [2,3,4], [5,6,7,8] - re.Len(info.Counts, 2) - re.EqualValues(3, info.Counts[0]) - re.EqualValues(4, info.Counts[1]) + info = splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), 2) // [2,3,4,5], [6,7,8] + re.Len(info.RegionCounts, 2) + re.EqualValues(4, info.RegionCounts[0]) + re.EqualValues(3, info.RegionCounts[1]) re.Len(info.Weights, 2) - re.EqualValues(303, info.Weights[0]) - re.EqualValues(404, info.Weights[1]) + re.EqualValues(404, info.Weights[0]) + re.EqualValues(303, info.Weights[1]) re.Len(info.Spans, 2) re.EqualValues(startKeys[2], info.Spans[0].StartKey) - re.EqualValues(endKeys[4], info.Spans[0].EndKey) - re.EqualValues(startKeys[5], info.Spans[1].StartKey) + re.EqualValues(endKeys[5], info.Spans[0].EndKey) + re.EqualValues(startKeys[6], info.Spans[1].StartKey) re.EqualValues(endKeys[8], info.Spans[1].EndKey) - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3, spanRegionLimit) // [2,3], [4,5,6], [7,8] - re.Len(info.Counts, 3) - re.EqualValues(2, info.Counts[0]) - re.EqualValues(2, info.Counts[1]) - re.EqualValues(3, info.Counts[2]) + info = splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), 3) // [2,3,4], [5,6,7], [8] + re.Len(info.RegionCounts, 3) + re.EqualValues(3, info.RegionCounts[0]) + re.EqualValues(3, info.RegionCounts[1]) + re.EqualValues(1, info.RegionCounts[2]) re.Len(info.Weights, 3) - re.EqualValues(202, info.Weights[0]) - re.EqualValues(202, info.Weights[1]) - re.EqualValues(303, info.Weights[2]) + re.EqualValues(303, info.Weights[0]) + re.EqualValues(303, info.Weights[1]) + re.EqualValues(101, info.Weights[2]) re.Len(info.Spans, 3) re.EqualValues(startKeys[2], info.Spans[0].StartKey) - re.EqualValues(endKeys[3], info.Spans[0].EndKey) - re.EqualValues(startKeys[4], info.Spans[1].StartKey) - re.EqualValues(endKeys[5], info.Spans[1].EndKey) - re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[4], info.Spans[0].EndKey) + re.EqualValues(startKeys[5], info.Spans[1].StartKey) + re.EqualValues(endKeys[7], info.Spans[1].EndKey) + re.EqualValues(startKeys[8], info.Spans[2].StartKey) re.EqualValues(endKeys[8], info.Spans[2].EndKey) - // Pages > regons + // spans > regions for p := 7; p <= 10; p++ { - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p, spanRegionLimit) - re.Len(info.Counts, 7) - for _, c := range info.Counts { + info = splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), p) + re.Len(info.RegionCounts, 7) + for _, c := range info.RegionCounts { re.EqualValues(1, c) } re.Len(info.Weights, 7) @@ -113,11 +115,6 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { re.EqualValues(endKeys[2+i], r.EndKey) } } - - // test spanRegionLimit works - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, 3) - re.Len(info.Counts, 2) - re.EqualValues(3, info.Counts[0]) } func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) { @@ -127,24 +124,24 @@ func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) { // Hotspots regions, startKeys, endKeys := prepareRegionsInfo( [7]int{100, 1, 100, 1, 1, 1, 100}) - - info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4], [5,6,7], [8] - re.Len(info.Counts, 4) - re.EqualValues(1, info.Counts[0]) - re.EqualValues(1, info.Counts[1]) - re.EqualValues(4, info.Counts[2]) - re.EqualValues(1, info.Counts[3]) + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 4) + info := splitter.splitRegionsByWrittenKeysV1(0, regions, 4) // [2], [3,4], [5,6,7], [8] + re.Len(info.RegionCounts, 4) + re.EqualValues(1, info.RegionCounts[0]) + re.EqualValues(2, info.RegionCounts[1]) + re.EqualValues(3, info.RegionCounts[2]) + re.EqualValues(1, info.RegionCounts[3]) re.Len(info.Weights, 4) re.EqualValues(101, info.Weights[0]) - re.EqualValues(2, info.Weights[1]) - re.EqualValues(107, info.Weights[2]) + re.EqualValues(103, info.Weights[1]) + re.EqualValues(6, info.Weights[2]) re.EqualValues(101, info.Weights[3]) re.Len(info.Spans, 4) re.EqualValues(startKeys[2], info.Spans[0].StartKey) re.EqualValues(endKeys[2], info.Spans[0].EndKey) re.EqualValues(startKeys[3], info.Spans[1].StartKey) - re.EqualValues(endKeys[3], info.Spans[1].EndKey) - re.EqualValues(startKeys[4], info.Spans[2].StartKey) + re.EqualValues(endKeys[4], info.Spans[1].EndKey) + re.EqualValues(startKeys[5], info.Spans[2].StartKey) re.EqualValues(endKeys[7], info.Spans[2].EndKey) re.EqualValues(startKeys[8], info.Spans[3].StartKey) re.EqualValues(endKeys[8], info.Spans[3].EndKey) @@ -157,14 +154,14 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) { // Hotspots regions, startKeys, endKeys := prepareRegionsInfo( [7]int{1000, 1, 1, 1, 100, 1, 99}) - - info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4,5], [6,7], [8] + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 4) + info := splitter.splitRegionsByWrittenKeysV1(0, regions, 4) // [2], [3,4,5,6], [7], [8] re.Len(info.Spans, 4) re.EqualValues(startKeys[2], info.Spans[0].StartKey) re.EqualValues(endKeys[2], info.Spans[0].EndKey) re.EqualValues(startKeys[3], info.Spans[1].StartKey) - re.EqualValues(endKeys[5], info.Spans[1].EndKey) - re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[6], info.Spans[1].EndKey) + re.EqualValues(startKeys[7], info.Spans[2].StartKey) re.EqualValues(endKeys[7], info.Spans[2].EndKey) re.EqualValues(startKeys[8], info.Spans[3].StartKey) re.EqualValues(endKeys[8], info.Spans[3].EndKey) @@ -173,23 +170,23 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) { func TestSplitRegionsByWrittenKeysCold(t *testing.T) { t.Parallel() re := require.New(t) - + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 0) regions, startKeys, endKeys := prepareRegionsInfo([7]int{}) - info := splitRegionsByWrittenKeys(0, regions, 0, 3, spanRegionLimit) // [2,3], [4,5], [6,7,8] - re.Len(info.Counts, 3) - re.EqualValues(2, info.Counts[0], info) - re.EqualValues(2, info.Counts[1]) - re.EqualValues(3, info.Counts[2]) + info := splitter.splitRegionsByWrittenKeysV1(0, regions, 3) // [2,3,4], [5,6,7], [8] + re.Len(info.RegionCounts, 3) + re.EqualValues(3, info.RegionCounts[0], info) + re.EqualValues(3, info.RegionCounts[1]) + re.EqualValues(1, info.RegionCounts[2]) re.Len(info.Weights, 3) - re.EqualValues(2, info.Weights[0]) - re.EqualValues(2, info.Weights[1]) - re.EqualValues(3, info.Weights[2]) + re.EqualValues(3, info.Weights[0]) + re.EqualValues(3, info.Weights[1]) + re.EqualValues(1, info.Weights[2]) re.Len(info.Spans, 3) re.EqualValues(startKeys[2], info.Spans[0].StartKey) - re.EqualValues(endKeys[3], info.Spans[0].EndKey) - re.EqualValues(startKeys[4], info.Spans[1].StartKey) - re.EqualValues(endKeys[5], info.Spans[1].EndKey) - re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[4], info.Spans[0].EndKey) + re.EqualValues(startKeys[5], info.Spans[1].StartKey) + re.EqualValues(endKeys[7], info.Spans[1].EndKey) + re.EqualValues(startKeys[8], info.Spans[2].StartKey) re.EqualValues(endKeys[8], info.Spans[2].EndKey) } @@ -197,10 +194,11 @@ func TestSplitRegionsByWrittenKeysConfig(t *testing.T) { t.Parallel() re := require.New(t) + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, math.MaxInt) regions, startKeys, endKeys := prepareRegionsInfo([7]int{1, 1, 1, 1, 1, 1, 1}) - info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3, spanRegionLimit) // [2,3,4,5,6,7,8] - re.Len(info.Counts, 1) - re.EqualValues(7, info.Counts[0], info) + info := splitter.splitRegionsByWrittenKeysV1(1, regions, 3) // [2,3,4,5,6,7,8] + re.Len(info.RegionCounts, 1) + re.EqualValues(7, info.RegionCounts[0], info) re.Len(info.Weights, 1) re.EqualValues(14, info.Weights[0]) re.Len(info.Spans, 1) @@ -208,9 +206,119 @@ func TestSplitRegionsByWrittenKeysConfig(t *testing.T) { re.EqualValues(endKeys[8], info.Spans[0].EndKey) re.EqualValues(1, info.Spans[0].TableID) - s := writeSplitter{} - spans := s.split(context.Background(), tablepb.Span{}, 3, &config.ChangefeedSchedulerConfig{ - WriteKeyThreshold: 0, - }) + splitter.writeKeyThreshold = 0 + spans := splitter.split(context.Background(), tablepb.Span{}, 3) require.Empty(t, spans) } + +func TestSplitRegionEven(t *testing.T) { + tblID := model.TableID(1) + regionCount := 4653 + 1051 + 745 + 9530 + 1 + regions := make([]pdutil.RegionInfo, regionCount) + for i := 0; i < regionCount; i++ { + regions[i] = pdutil.RegionInfo{ + ID: uint64(i), + StartKey: "" + strconv.Itoa(i), + EndKey: "" + strconv.Itoa(i), + WrittenKeys: 2, + } + } + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 4) + info := splitter.splitRegionsByWrittenKeysV1(tblID, regions, 5) + require.Len(t, info.RegionCounts, 5) + require.Len(t, info.Weights, 5) + for i, w := range info.Weights { + if i == 4 { + require.Equal(t, uint64(9576), w, i) + } else { + require.Equal(t, uint64(9591), w, i) + } + } +} + +func TestSpanRegionLimitBase(t *testing.T) { + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 0) + var regions []pdutil.RegionInfo + // test spanRegionLimit works + for i := 0; i < spanRegionLimit*6; i++ { + regions = append(regions, pdutil.NewTestRegionInfo(uint64(i+9), []byte("f"), []byte("f"), 100)) + } + captureNum := 2 + spanNum := getSpansNumber(len(regions), captureNum) + info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) + require.Len(t, info.RegionCounts, spanNum) + for _, c := range info.RegionCounts { + require.LessOrEqual(t, c, int(spanRegionLimit*1.1)) + } +} + +func TestSpanRegionLimit(t *testing.T) { + // Fisher-Yates shuffle algorithm to shuffle the writtenKeys + // but keep the first preservationRate% of the writtenKeys in the left side of the list + // to make the writtenKeys more like a hot region list + shuffle := func(nums []int, preservationRate float64) []int { + n := len(nums) + shuffled := make([]int, n) + copy(shuffled, nums) + + for i := n - 1; i > 0; i-- { + if rand.Float64() < preservationRate { + continue + } + j := rand.Intn(i + 1) + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + } + + return shuffled + } + + // total region number + totalRegionNumbers := spanRegionLimit * 10 + + // writtenKeys over 20000 percentage + percentOver20000 := 1 + // writtenKeys between 5000 and 10000 percentage + percentBetween5000And10000 := 5 + + countOver20000 := (percentOver20000 * totalRegionNumbers) / 100 + countBetween5000And10000 := (percentBetween5000And10000 * totalRegionNumbers) / 100 + countBelow1000 := totalRegionNumbers - countOver20000 - countBetween5000And10000 + + // random generate writtenKeys for each region + var writtenKeys []int + + for i := 0; i < countOver20000; i++ { + number := rand.Intn(80000) + 20001 + writtenKeys = append(writtenKeys, number) + } + + for i := 0; i < countBetween5000And10000; i++ { + number := rand.Intn(5001) + 5000 + writtenKeys = append(writtenKeys, number) + } + + for i := 0; i < countBelow1000; i++ { + number := rand.Intn(1000) + writtenKeys = append(writtenKeys, number) + } + + // 70% hot written region is in the left side of the region list + writtenKeys = shuffle(writtenKeys, 0.7) + + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 0) + var regions []pdutil.RegionInfo + // region number is 500,000 + // weight is random between 0 and 40,000 + for i := 0; i < len(writtenKeys); i++ { + regions = append( + regions, + pdutil.NewTestRegionInfo(uint64(i+9), []byte("f"), []byte("f"), uint64(writtenKeys[i]))) + } + captureNum := 3 + spanNum := getSpansNumber(len(regions), captureNum) + info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) + require.LessOrEqual(t, spanNum, len(info.RegionCounts)) + for _, c := range info.RegionCounts { + require.LessOrEqual(t, c, int(spanRegionLimit*1.1)) + } +}