Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask, ddl: refine disttask_flow #46472

Merged
merged 4 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,15 +690,20 @@
return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo)
})

dispatcher.RegisterTaskFlowHandle(BackfillTaskType, NewLitBackfillFlowHandle(d))
scheduler.RegisterSubtaskExectorConstructor(BackfillTaskType, proto.StepOne,
func(proto.MinimalTask, int64) (scheduler.SubtaskExecutor, error) {
return &scheduler.EmptyExecutor{}, nil
})
scheduler.RegisterSubtaskExectorConstructor(BackfillTaskType, proto.StepTwo,
func(proto.MinimalTask, int64) (scheduler.SubtaskExecutor, error) {
return &scheduler.EmptyExecutor{}, nil
})
backfillHandle, err := NewLitBackfillFlowHandle(d)
if err != nil {
logutil.BgLogger().Warn("NewLitBackfillFlowHandle failed", zap.String("category", "ddl"), zap.Error(err))

Check warning on line 695 in ddl/ddl.go

View check run for this annotation

Codecov / codecov/patch

ddl/ddl.go#L695

Added line #L695 was not covered by tests
} else {
dispatcher.RegisterTaskFlowHandle(BackfillTaskType, backfillHandle)
scheduler.RegisterSubtaskExectorConstructor(BackfillTaskType, proto.StepOne,
func(proto.MinimalTask, int64) (scheduler.SubtaskExecutor, error) {
return &scheduler.EmptyExecutor{}, nil
})

Check warning on line 701 in ddl/ddl.go

View check run for this annotation

Codecov / codecov/patch

ddl/ddl.go#L700-L701

Added lines #L700 - L701 were not covered by tests
scheduler.RegisterSubtaskExectorConstructor(BackfillTaskType, proto.StepTwo,
func(proto.MinimalTask, int64) (scheduler.SubtaskExecutor, error) {
return &scheduler.EmptyExecutor{}, nil
})

Check warning on line 705 in ddl/ddl.go

View check run for this annotation

Codecov / codecov/patch

ddl/ddl.go#L704-L705

Added lines #L704 - L705 were not covered by tests
}

// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
variable.EnableDDL = d.EnableDDL
Expand Down
225 changes: 125 additions & 100 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,142 +33,82 @@
)

type litBackfillFlowHandle struct {
d DDL
d *ddl
}

var _ dispatcher.TaskFlowHandle = (*litBackfillFlowHandle)(nil)

// NewLitBackfillFlowHandle creates a new litBackfillFlowHandle.
func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle {
return &litBackfillFlowHandle{
d: d,
func NewLitBackfillFlowHandle(d DDL) (dispatcher.TaskFlowHandle, error) {
ddl, ok := d.(*ddl)
if !ok {
return nil, errors.New("The getDDL result should be the type of *ddl")

Check warning on line 45 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L45

Added line #L45 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log here instead of return the error but ignore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore

I think current implementation is ok, we can't assume the d DDL is *ddl

}
return &litBackfillFlowHandle{
d: ddl,
}, nil
}

func (*litBackfillFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
func (h *litBackfillFlowHandle) ProcessNormalFlow(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
var globalTaskMeta BackfillGlobalMeta
if err = json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil {
if err := json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil {
return nil, err
}

d, ok := h.d.(*ddl)
if !ok {
return nil, errors.New("The getDDL result should be the type of *ddl")
}

job := &globalTaskMeta.Job
var tblInfo *model.TableInfo
err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
tblInfo, err := getTblInfo(h.d, job)
if err != nil {
return nil, err
}

var subTaskMetas [][]byte
if tblInfo.Partition == nil {
switch gTask.Step {
case proto.StepOne:
serverNodes, err := dispatcher.GenerateSchedulerNodes(d.ctx)
if err != nil {
return nil, err
}
subTaskMetas = make([][]byte, 0, len(serverNodes))
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for range serverNodes {
subTaskMetas = append(subTaskMetas, metaBytes)
}
gTask.Step = proto.StepTwo
return subTaskMetas, nil
case proto.StepTwo:
// generate partition table's plan.
if tblInfo.Partition != nil {
if gTask.Step != proto.StepInit {
// This flow for partition table has only one step
return nil, nil
default:
}
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
subTaskMetas, err := generatePartitionPlan(tblInfo)
if err != nil {
return nil, err
}
ver, err := getValidCurrentVersion(d.store)
gTask.Step = proto.StepOne
return subTaskMetas, nil
}

// generate non-partition table's plan.
switch gTask.Step {
case proto.StepInit:
subtaskMeta, err := generateNonPartitionPlan(h.d, tblInfo, job)
if err != nil {
return nil, errors.Trace(err)
}
startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
if startKey == nil && endKey == nil {
// Empty table.
gTask.Step = proto.StepOne
return nil, nil
return nil, err

Check warning on line 87 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L87

Added line #L87 was not covered by tests
}
gTask.Step = proto.StepOne
return subtaskMeta, nil
case proto.StepOne:
serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx)

Check warning on line 92 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L91-L92

Added lines #L91 - L92 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
return nil, err

Check warning on line 94 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L94

Added line #L94 was not covered by tests
}
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
subTaskMetas = make([][]byte, 0, len(serverNodes))
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)

Check warning on line 98 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L96-L98

Added lines #L96 - L98 were not covered by tests
if err != nil {
return nil, err
}

subTaskMetas = make([][]byte, 0, 100)
regionBatch := 20
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{StartKey: batch[0].StartKey(), EndKey: batch[len(batch)-1].EndKey()}
if i == 0 {
subTaskMeta.StartKey = startKey
}
if end == len(recordRegionMetas) {
subTaskMeta.EndKey = endKey
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}
subTaskMetas = append(subTaskMetas, metaBytes)
}
} else {
if gTask.State != proto.TaskStatePending {
// This flow for partition table has only one step, finish task when it is not pending
return nil, nil
}

defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas = make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
}

metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}

for range serverNodes {

Check warning on line 102 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L102

Added line #L102 was not covered by tests
subTaskMetas = append(subTaskMetas, metaBytes)
}
gTask.Step = proto.StepTwo
return subTaskMetas, nil
case proto.StepTwo:
return nil, nil
default:
return nil, nil

Check warning on line 110 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L105-L110

Added lines #L105 - L110 were not covered by tests
}

gTask.Step = proto.StepOne
return subTaskMetas, nil
}

func (*litBackfillFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErr []error) (meta []byte, err error) {
Expand All @@ -187,3 +127,88 @@
func (*litBackfillFlowHandle) IsRetryableErr(error) bool {
return true
}

func getTblInfo(d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
if err != nil {
return nil, err
}

Check warning on line 138 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L137-L138

Added lines #L137 - L138 were not covered by tests

return tblInfo, nil
}

func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) {
defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
physicalIDs[i] = defs[i].ID
}

subTaskMetas := make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
}

metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}

Check warning on line 159 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L158-L159

Added lines #L158 - L159 were not covered by tests

subTaskMetas = append(subTaskMetas, metaBytes)
}
return subTaskMetas, nil
}

func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) (metas [][]byte, err error) {
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
if err != nil {
return nil, err
}

Check warning on line 170 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L169-L170

Added lines #L169 - L170 were not covered by tests
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
}

Check warning on line 174 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L173-L174

Added lines #L173 - L174 were not covered by tests
startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
if startKey == nil && endKey == nil {
// Empty table.
return nil, nil
}
if err != nil {
return nil, errors.Trace(err)
}
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
return nil, err
}

Check warning on line 187 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L180-L187

Added lines #L180 - L187 were not covered by tests

subTaskMetas := make([][]byte, 0, 100)
regionBatch := 20
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{StartKey: batch[0].StartKey(), EndKey: batch[len(batch)-1].EndKey()}
if i == 0 {
subTaskMeta.StartKey = startKey
}
if end == len(recordRegionMetas) {
subTaskMeta.EndKey = endKey
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err
}
subTaskMetas = append(subTaskMetas, metaBytes)

Check warning on line 211 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L189-L211

Added lines #L189 - L211 were not covered by tests
}
return subTaskMetas, nil

Check warning on line 213 in ddl/disttask_flow.go

View check run for this annotation

Codecov / codecov/patch

ddl/disttask_flow.go#L213

Added line #L213 was not covered by tests
}
4 changes: 2 additions & 2 deletions ddl/disttask_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

func TestBackfillFlowHandle(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handler := ddl.NewLitBackfillFlowHandle(dom.DDL())

handler, err := ddl.NewLitBackfillFlowHandle(dom.DDL())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

Expand Down
9 changes: 9 additions & 0 deletions tests/realtikvtest/addindextest/add_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;")
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check index t idx;")

tk.MustExec("create table t1(a bigint auto_random primary key);")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
tk.MustExec("split table t1 between (3) and (8646911284551352360) regions 50;")
tk.MustExec("alter table t1 add index idx(a);")
tk.MustExec("admin check index t1 idx;")
tk.MustExec(`set global tidb_enable_dist_task=0;`)
}

Expand Down