Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduller(ticc): fix uneven write keys distribution (#9664) #9672

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/keyspan/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ func NewReconcilerForTests(
return &Reconciler{
tableSpans: make(map[int64]splittedSpans),
config: config,
splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache)},
splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache, config.RegionPerSpan)},
}
}
36 changes: 31 additions & 5 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import (
"go.uber.org/zap"
)

const spanRegionLimit = 50000
const (
// spanRegionLimit is the maximum number of regions a span can cover.
spanRegionLimit = 50000
// baseSpanNumberCoefficient is the base coefficient that use to
// multiply the number of captures to get the number of spans.
baseSpanNumberCoefficient = 3
)

type splitter interface {
split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
) []tablepb.Span
}

Expand Down Expand Up @@ -72,8 +77,8 @@ func NewReconciler(
config: config,
splitter: []splitter{
// write splitter has the highest priority.
newWriteSplitter(changefeedID, pdapi),
newRegionCountSplitter(changefeedID, up.RegionCache),
newWriteSplitter(changefeedID, pdapi, config.WriteKeyThreshold),
newRegionCountSplitter(changefeedID, up.RegionCache, config.RegionThreshold),
},
}, nil
}
Expand Down Expand Up @@ -121,7 +126,7 @@ func (m *Reconciler) Reconcile(
spans := []tablepb.Span{tableSpan}
if compat.CheckSpanReplicationEnabled() {
for _, splitter := range m.splitter {
spans = splitter.split(ctx, tableSpan, len(aliveCaptures), m.config)
spans = splitter.split(ctx, tableSpan, len(aliveCaptures))
if len(spans) > 1 {
break
}
Expand Down Expand Up @@ -208,3 +213,24 @@ func (m *Reconciler) Reconcile(
}
return m.spanCache
}

const maxSpanNumber = 100

func getSpansNumber(regionNum, captureNum int) int {
coefficient := captureNum - 1
if baseSpanNumberCoefficient > coefficient {
coefficient = baseSpanNumberCoefficient
}
spanNum := 1
if regionNum > 1 {
// spanNumber = max(captureNum * coefficient, totalRegions / spanRegionLimit)
spanNum = captureNum * coefficient
if regionNum/spanRegionLimit > spanNum {
spanNum = regionNum / spanRegionLimit
}
}
if spanNum > maxSpanNumber {
spanNum = maxSpanNumber
}
return spanNum
}
17 changes: 17 additions & 0 deletions cdc/scheduler/internal/v3/keyspan/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,20 @@ func TestBatchAddRateLimit(t *testing.T) {
require.Equal(t, allSpan, reconciler.tableSpans[2].spans)
require.Equal(t, 1, len(reconciler.tableSpans))
}

func TestGetSpansNumber(t *testing.T) {
tc := []struct {
regionCount int
captureNum int
expected int
}{
{1, 10, 1},
{100, 2, 6},
{100, 3, 9},
{100, 5, 20},
{10000, 11, 100},
}
for _, c := range tc {
require.Equal(t, c.expected, getSpansNumber(c.regionCount, c.captureNum))
}
}
40 changes: 16 additions & 24 deletions cdc/scheduler/internal/v3/keyspan/splitter_region_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

type regionCountSplitter struct {
changefeedID model.ChangeFeedID
regionCache RegionCache
changefeedID model.ChangeFeedID
regionCache RegionCache
regionThreshold int
}

func newRegionCountSplitter(
changefeedID model.ChangeFeedID, regionCache RegionCache,
changefeedID model.ChangeFeedID, regionCache RegionCache, regionThreshold int,
) *regionCountSplitter {
return &regionCountSplitter{
changefeedID: changefeedID,
regionCache: regionCache,
changefeedID: changefeedID,
regionCache: regionCache,
regionThreshold: regionThreshold,
}
}

func (m *regionCountSplitter) split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
ctx context.Context, span tablepb.Span, captureNum int,
) []tablepb.Span {
bo := tikv.NewBackoffer(ctx, 500)
regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey)
Expand All @@ -54,29 +54,21 @@ func (m *regionCountSplitter) split(
zap.Error(err))
return []tablepb.Span{span}
}
if len(regions) <= config.RegionThreshold || totalCaptures == 0 {
if len(regions) <= m.regionThreshold || captureNum == 0 {
log.Info("schedulerv3: skip split span by region count",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("span", span.String()),
zap.Int("totalCaptures", totalCaptures),
zap.Int("totalCaptures", captureNum),
zap.Int("regionCount", len(regions)),
zap.Int("regionThreshold", config.RegionThreshold))
zap.Int("regionThreshold", m.regionThreshold))
return []tablepb.Span{span}
}

pages := totalCaptures
stepper := newEvenlySplitStepper(
getSpansNumber(len(regions), captureNum),
len(regions))

totalRegions := len(regions)
if totalRegions == 0 {
pages = 1
}

if totalRegions/spanRegionLimit > pages {
pages = totalRegions / spanRegionLimit
}

stepper := newEvenlySplitStepper(pages, totalRegions)
spans := make([]tablepb.Span, 0, stepper.SpanCount())
start, end := 0, stepper.Step()
for {
Expand Down Expand Up @@ -133,9 +125,9 @@ func (m *regionCountSplitter) split(
zap.String("changefeed", m.changefeedID.ID),
zap.String("span", span.String()),
zap.Int("spans", len(spans)),
zap.Int("totalCaptures", totalCaptures),
zap.Int("totalCaptures", captureNum),
zap.Int("regionCount", len(regions)),
zap.Int("regionThreshold", config.RegionThreshold),
zap.Int("regionThreshold", m.regionThreshold),
zap.Int("spanRegionLimit", spanRegionLimit))
return spans
}
Expand Down
66 changes: 38 additions & 28 deletions cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func TestRegionCountSplitSpan(t *testing.T) {
totalCaptures: 4,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
Expand All @@ -87,35 +88,42 @@ func TestRegionCountSplitSpan(t *testing.T) {
totalCaptures: 3,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
{
totalCaptures: 2,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_3")}, // 3 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t2")}, // 2 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region
{TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region
{TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
{
totalCaptures: 1,
span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
expectSpans: []tablepb.Span{
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, // 5 region
{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region
{TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region
{TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region
},
},
}

for i, cs := range cases {
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, cfg)
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
spans := splitter.split(context.Background(), cs.span, cs.totalCaptures)
require.Equalf(t, cs.expectSpans, spans, "%d %s", i, &cs.span)
}
}
Expand All @@ -134,64 +142,66 @@ func TestRegionCountEvenlySplitSpan(t *testing.T) {

cases := []struct {
totalCaptures int
expectedSpans int
expectSpansMin int
expectSpansMax int
}{
{
totalCaptures: 0,
expectedSpans: 1,
expectSpansMin: 1000,
expectSpansMax: 1000,
},
{
totalCaptures: 1,
expectSpansMin: 1000,
expectSpansMax: 1000,
expectedSpans: 3,
expectSpansMin: 333,
expectSpansMax: 334,
},
{
totalCaptures: 3,
expectSpansMin: 333,
expectSpansMax: 334,
expectedSpans: 9,
expectSpansMin: 111,
expectSpansMax: 113,
},
{
totalCaptures: 7,
expectSpansMin: 142,
expectSpansMax: 143,
expectedSpans: 42,
expectSpansMin: 23,
expectSpansMax: 24,
},
{
totalCaptures: 999,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 2,
expectSpansMax: 10,
},
{
totalCaptures: 1000,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 1,
expectSpansMax: 10,
},
{
totalCaptures: 2000,
expectedSpans: 100,
expectSpansMin: 1,
expectSpansMax: 1,
expectSpansMax: 10,
},
}
for i, cs := range cases {
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
spans := splitter.split(
context.Background(),
tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")},
cs.totalCaptures,
cfg,
)
if cs.totalCaptures == 0 {
require.Equalf(t, 1, len(spans), "%d %v", i, cs)
} else if cs.totalCaptures <= 1000 {
require.Equalf(t, cs.totalCaptures, len(spans), "%d %v", i, cs)
} else {
require.Equalf(t, 1000, len(spans), "%d %v", i, cs)
}

require.Equalf(t, cs.expectedSpans, len(spans), "%d %v", i, cs)

for _, span := range spans {
start, end := 0, 1000
Expand All @@ -217,13 +227,13 @@ func TestSplitSpanRegionOutOfOrder(t *testing.T) {
cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_4")}, 2)
cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3)

splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache)
cfg := &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true,
RegionThreshold: 1,
}
splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache, cfg.RegionThreshold)
span := tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}
spans := splitter.split(context.Background(), span, 1, cfg)
spans := splitter.split(context.Background(), span, 1)
require.Equal(
t, []tablepb.Span{{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans)
}
Loading
Loading