Skip to content

Commit

Permalink
scheduller(ticc): fix uneven write keys distribution (#9664) (#9672)
Browse files Browse the repository at this point in the history
close #9665
  • Loading branch information
ti-chi-bot authored Sep 5, 2023
1 parent 82ff5df commit d2bd195
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 223 deletions.
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/keyspan/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ func NewReconcilerForTests(
return &Reconciler{
tableSpans: make(map[int64]splittedSpans),
config: config,
splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache)},
splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache, config.RegionPerSpan)},
}
}
36 changes: 31 additions & 5 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import (
"go.uber.org/zap"
)

const spanRegionLimit = 50000
const (
// spanRegionLimit is the maximum number of regions a span can cover.
spanRegionLimit = 50000
// baseSpanNumberCoefficient is the base coefficient that use to
// multiply the number of captures to get the number of spans.
baseSpanNumberCoefficient = 3
)

type splitter interface {
split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
) []tablepb.Span
}

Expand Down Expand Up @@ -72,8 +77,8 @@ func NewReconciler(
config: config,
splitter: []splitter{
// write splitter has the highest priority.
newWriteSplitter(changefeedID, pdapi),
newRegionCountSplitter(changefeedID, up.RegionCache),
newWriteSplitter(changefeedID, pdapi, config.WriteKeyThreshold),
newRegionCountSplitter(changefeedID, up.RegionCache, config.RegionThreshold),
},
}, nil
}
Expand Down Expand Up @@ -121,7 +126,7 @@ func (m *Reconciler) Reconcile(
spans := []tablepb.Span{tableSpan}
if compat.CheckSpanReplicationEnabled() {
for _, splitter := range m.splitter {
spans = splitter.split(ctx, tableSpan, len(aliveCaptures), m.config)
spans = splitter.split(ctx, tableSpan, len(aliveCaptures))
if len(spans) > 1 {
break
}
Expand Down Expand Up @@ -208,3 +213,24 @@ func (m *Reconciler) Reconcile(
}
return m.spanCache
}

const maxSpanNumber = 100

func getSpansNumber(regionNum, captureNum int) int {
coefficient := captureNum - 1
if baseSpanNumberCoefficient > coefficient {
coefficient = baseSpanNumberCoefficient
}
spanNum := 1
if regionNum > 1 {
// spanNumber = max(captureNum * coefficient, totalRegions / spanRegionLimit)
spanNum = captureNum * coefficient
if regionNum/spanRegionLimit > spanNum {
spanNum = regionNum / spanRegionLimit
}
}
if spanNum > maxSpanNumber {
spanNum = maxSpanNumber
}
return spanNum
}
17 changes: 17 additions & 0 deletions cdc/scheduler/internal/v3/keyspan/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,20 @@ func TestBatchAddRateLimit(t *testing.T) {
require.Equal(t, allSpan, reconciler.tableSpans[2].spans)
require.Equal(t, 1, len(reconciler.tableSpans))
}

func TestGetSpansNumber(t *testing.T) {
tc := []struct {
regionCount int
captureNum int
expected int
}{
{1, 10, 1},
{100, 2, 6},
{100, 3, 9},
{100, 5, 20},
{10000, 11, 100},
}
for _, c := range tc {
require.Equal(t, c.expected, getSpansNumber(c.regionCount, c.captureNum))
}
}
40 changes: 16 additions & 24 deletions cdc/scheduler/internal/v3/keyspan/splitter_region_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

type regionCountSplitter struct {
changefeedID model.ChangeFeedID
regionCache RegionCache
changefeedID model.ChangeFeedID
regionCache RegionCache
regionThreshold int
}

func newRegionCountSplitter(
changefeedID model.ChangeFeedID, regionCache RegionCache,
changefeedID model.ChangeFeedID, regionCache RegionCache, regionThreshold int,
) *regionCountSplitter {
return &regionCountSplitter{
changefeedID: changefeedID,
regionCache: regionCache,
changefeedID: changefeedID,
regionCache: regionCache,
regionThreshold: regionThreshold,
}
}

func (m *regionCountSplitter) split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
ctx context.Context, span tablepb.Span, captureNum int,
) []tablepb.Span {
bo := tikv.NewBackoffer(ctx, 500)
regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey)
Expand All @@ -54,29 +54,21 @@ func (m *regionCountSplitter) split(
zap.Error(err))
return []tablepb.Span{span}
}
if len(regions) <= config.RegionThreshold || totalCaptures == 0 {
if len(regions) <= m.regionThreshold || captureNum == 0 {
log.Info("schedulerv3: skip split span by region count",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("span", span.String()),
zap.Int("totalCaptures", totalCaptures),
zap.Int("totalCaptures", captureNum),
zap.Int("regionCount", len(regions)),
zap.Int("regionThreshold", config.RegionThreshold))
zap.Int("regionThreshold", m.regionThreshold))
return []tablepb.Span{span}
}

pages := totalCaptures
stepper := newEvenlySplitStepper(
getSpansNumber(len(regions), captureNum),
len(regions))

totalRegions := len(regions)
if totalRegions == 0 {
pages = 1
}

if totalRegions/spanRegionLimit > pages {
pages = totalRegions / spanRegionLimit
}

stepper := newEvenlySplitStepper(pages, totalRegions)
spans := make([]tablepb.Span, 0, stepper.SpanCount())
start, end := 0, stepper.Step()
for {
Expand Down Expand Up @@ -133,9 +125,9 @@ func (m *regionCountSplitter) split(
zap.String("changefeed", m.changefeedID.ID),
zap.String("span", span.String()),
zap.Int("spans", len(spans)),
zap.Int("totalCaptures", totalCaptures),
zap.Int("totalCaptures", captureNum),
zap.Int("regionCount", len(regions)),
zap.Int("regionThreshold", config.RegionThreshold),
zap.Int("regionThreshold", m.regionThreshold),
zap.Int("spanRegionLimit", spanRegionLimit))
return spans
}
Expand Down
66 changes: 38 additions & 28 deletions cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func TestRegionCountSplitSpan(t *testing.T) {
totalCaptures: 4,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
Expand All @@ -87,35 +88,42 @@ func TestRegionCountSplitSpan(t *testing.T) {
totalCaptures: 3,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
{
totalCaptures: 2,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_3")}, // 3 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t2")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
{
totalCaptures: 1,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, // 5 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
}

for i, cs := range cases {
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, cfg)
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
spans := splitter.split(context.Background(), cs.span, cs.totalCaptures)
require.Equalf(t, cs.expectSpans, spans, "%d %s", i, &cs.span)
}
}
Expand All @@ -134,64 +142,66 @@ func TestRegionCountEvenlySplitSpan(t *testing.T) {

cases := []struct {
totalCaptures int
expectedSpans int
expectSpansMin int
expectSpansMax int
}{
{
totalCaptures: 0,
expectedSpans: 1,
expectSpansMin: 1000,
expectSpansMax: 1000,
},
{
totalCaptures: 1,
expectSpansMin: 1000,
expectSpansMax: 1000,
expectedSpans: 3,
expectSpansMin: 333,
expectSpansMax: 334,
},
{
totalCaptures: 3,
expectSpansMin: 333,
expectSpansMax: 334,
expectedSpans: 9,
expectSpansMin: 111,
expectSpansMax: 113,
},
{
totalCaptures: 7,
expectSpansMin: 142,
expectSpansMax: 143,
expectedSpans: 42,
expectSpansMin: 23,
expectSpansMax: 24,
},
{
totalCaptures: 999,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 2,
expectSpansMax: 10,
},
{
totalCaptures: 1000,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 1,
expectSpansMax: 10,
},
{
totalCaptures: 2000,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 1,
expectSpansMax: 10,
},
}
for i, cs := range cases {
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
spans := splitter.split(
context.Background(),
tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
cs.totalCaptures,
cfg,
)
if cs.totalCaptures == 0 {
require.Equalf(t, 1, len(spans), "%d %v", i, cs)
} else if cs.totalCaptures <= 1000 {
require.Equalf(t, cs.totalCaptures, len(spans), "%d %v", i, cs)
} else {
require.Equalf(t, 1000, len(spans), "%d %v", i, cs)
}

require.Equalf(t, cs.expectedSpans, len(spans), "%d %v", i, cs)

for _, span := range spans {
start, end := 0, 1000
Expand All @@ -217,13 +227,13 @@ func TestSplitSpanRegionOutOfOrder(t *testing.T) {
cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_4")}, 2)
cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3)

splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
span := tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}
spans := splitter.split(context.Background(), span, 1, cfg)
spans := splitter.split(context.Background(), span, 1)
require.Equal(
t, []tablepb.Span{{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans)
}
Loading

0 comments on commit d2bd195

Please sign in to comment.