Skip to content

Commit

Permalink
disttask: cleanup framework meta when nodes shutdown (pingcap#47536)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored and wuhuizuo committed Apr 2, 2024
1 parent 8879fc3 commit 3514d7f
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 93 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ mock_lightning: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher,CleanUpRoutine > disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/util/sqlexec RestrictedSQLExecutor > util/sqlexec/mock/restricted_sql_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Dispatcher,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
framework_store "github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down Expand Up @@ -224,7 +223,7 @@ type litBackfillDispatcher struct {
*dispatcher.BaseDispatcher
}

func newLitBackfillDispatcher(ctx context.Context, taskMgr *framework_store.TaskManager,
func newLitBackfillDispatcher(ctx context.Context, taskMgr dispatcher.TaskManager,
serverID string, task *proto.Task, handle dispatcher.Extension) dispatcher.Dispatcher {
dis := litBackfillDispatcher{
BaseDispatcher: dispatcher.NewBaseDispatcher(ctx, taskMgr, serverID, task),
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -703,7 +702,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
logutil.BgLogger().Warn("NewBackfillingDispatcherExt failed", zap.String("category", "ddl"), zap.Error(err))
} else {
dispatcher.RegisterDispatcherFactory(proto.Backfill,
func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
func(ctx context.Context, taskMgr dispatcher.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
return newLitBackfillDispatcher(ctx, taskMgr, serverID, task, backFillDsp)
})
dispatcher.RegisterDispatcherCleanUpFactory(proto.Backfill, newBackfillCleanUpS3)
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ go_test(
name = "dispatcher_test",
timeout = "short",
srcs = [
"dispatcher_manager_test.go",
"dispatcher_test.go",
"main_test.go",
],
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 14,
shard_count = 15,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/proto",
Expand Down
9 changes: 7 additions & 2 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Dispatcher interface {
// each task type embed this struct and implement the Extension interface.
type BaseDispatcher struct {
ctx context.Context
taskMgr *storage.TaskManager
taskMgr TaskManager
Task *proto.Task
logCtx context.Context
// serverID, it's value is ip:port now.
Expand All @@ -103,7 +103,7 @@ type BaseDispatcher struct {
var MockOwnerChange func()

// NewBaseDispatcher creates a new BaseDispatcher.
func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) *BaseDispatcher {
func NewBaseDispatcher(ctx context.Context, taskMgr TaskManager, serverID string, task *proto.Task) *BaseDispatcher {
logCtx := logutil.WithFields(context.Background(), zap.Int64("task-id", task.ID),
zap.Stringer("task-type", task.Type))
return &BaseDispatcher{
Expand Down Expand Up @@ -394,17 +394,22 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error {
}
if len(d.liveNodes) > 0 {
replaceNodes := make(map[string]string)
cleanNodes := make([]string, 0)
for _, nodeID := range d.taskNodes {
if ok := disttaskutil.MatchServerInfo(d.liveNodes, nodeID); !ok {
n := d.liveNodes[d.rand.Int()%len(d.liveNodes)] //nolint:gosec
replaceNodes[nodeID] = disttaskutil.GenerateExecID(n.IP, n.Port)
cleanNodes = append(cleanNodes, nodeID)
}
}
if len(replaceNodes) > 0 {
logutil.Logger(d.logCtx).Info("reschedule subtasks to other nodes", zap.Int("node-cnt", len(replaceNodes)))
if err := d.taskMgr.UpdateFailedSchedulerIDs(d.Task.ID, replaceNodes); err != nil {
return err
}
if err := d.taskMgr.CleanUpMeta(cleanNodes); err != nil {
return err
}
// replace local cache.
for k, v := range replaceNodes {
for m, n := range d.taskNodes {
Expand Down
47 changes: 42 additions & 5 deletions pkg/disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (dm *Manager) clearRunningTasks() {
type Manager struct {
ctx context.Context
cancel context.CancelFunc
taskMgr *storage.TaskManager
taskMgr TaskManager
wg tidbutil.WaitGroupWrapper
gPool *spool.Pool
inited bool
Expand All @@ -107,9 +107,9 @@ type Manager struct {
}

// NewManager creates a dispatcher struct.
func NewManager(ctx context.Context, taskTable *storage.TaskManager, serverID string) (*Manager, error) {
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) (*Manager, error) {
dispatcherManager := &Manager{
taskMgr: taskTable,
taskMgr: taskMgr,
serverID: serverID,
}
gPool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true))
Expand Down Expand Up @@ -305,11 +305,15 @@ func (dm *Manager) cleanUpLoop() {
// WaitCleanUpFinished is used to sync the test.
var WaitCleanUpFinished = make(chan struct{})

// doCleanUpRoutine processes clean up routine defined by each type of tasks.
// doCleanUpRoutine processes clean up routine defined by each type of tasks and cleanUpMeta.
// For example:
//
// tasks with global sort should clean up tmp files stored on S3.
func (dm *Manager) doCleanUpRoutine() {
cnt := dm.CleanUpMeta()
if cnt != 0 {
logutil.BgLogger().Info("clean up nodes in framework meta since nodes shutdown", zap.Int("cnt", cnt))
}
tasks, err := dm.taskMgr.GetGlobalTasksInStates(
proto.TaskStateFailed,
proto.TaskStateReverted,
Expand All @@ -334,6 +338,39 @@ func (dm *Manager) doCleanUpRoutine() {
logutil.Logger(dm.ctx).Info("cleanUp routine success")
}

// CleanUpMeta clean up old node info in dist_framework_meta table.
func (dm *Manager) CleanUpMeta() int {
// Safe to discard errors since this function can be called at regular intervals.
serverInfos, err := GenerateSchedulerNodes(dm.ctx)
if err != nil {
logutil.BgLogger().Warn("generate scheduler nodes met error")
return 0
}

oldNodes, err := dm.taskMgr.GetAllNodes()
if err != nil {
logutil.BgLogger().Warn("get all nodes met error")
return 0
}

cleanNodes := make([]string, 0)
for _, nodeID := range oldNodes {
if ok := disttaskutil.MatchServerInfo(serverInfos, nodeID); !ok {
cleanNodes = append(cleanNodes, nodeID)
}
}
if len(cleanNodes) == 0 {
return 0
}
logutil.BgLogger().Info("start to clean up dist_framework_meta")
err = dm.taskMgr.CleanUpMeta(cleanNodes)
if err != nil {
logutil.BgLogger().Warn("clean up dist_framework_meta met error")
return 0
}
return len(cleanNodes)
}

func (dm *Manager) cleanUpFinishedTasks(tasks []*proto.Task) error {
cleanedTasks := make([]*proto.Task, 0)
var firstErr error
Expand Down
130 changes: 130 additions & 0 deletions pkg/disttask/framework/dispatcher/dispatcher_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher_test

import (
"context"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestCleanUpRoutine(t *testing.T) {
store := testkit.CreateMockStore(t)
gtk := testkit.NewTestKit(t, store)
pool := pools.NewResourcePool(func() (pools.Resource, error) {
return gtk.Session(), nil
}, 1, 1, time.Second)
defer pool.Close()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

dsp, mgr := MockDispatcherManager(t, pool)
mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl)
mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
dispatcher.RegisterDispatcherFactory(proto.TaskTypeExample,
func(ctx context.Context, taskMgr dispatcher.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
mockDispatcher := dsp.MockDispatcher(task)
mockDispatcher.Extension = &numberExampleDispatcherExt{}
return mockDispatcher
})
dispatcher.RegisterDispatcherCleanUpFactory(proto.TaskTypeExample,
func() dispatcher.CleanUpRoutine {
return mockCleanupRountine
})
dsp.Start()
defer dsp.Stop()
require.NoError(t, mgr.StartManager(":4000", "background"))

taskID, err := mgr.AddNewGlobalTask("test", proto.TaskTypeExample, 1, nil)
require.NoError(t, err)

checkTaskRunningCnt := func() []*proto.Task {
var tasks []*proto.Task
require.Eventually(t, func() bool {
var err error
tasks, err = mgr.GetGlobalTasksInStates(proto.TaskStateRunning)
require.NoError(t, err)
return len(tasks) == 1
}, time.Second, 50*time.Millisecond)
return tasks
}

checkSubtaskCnt := func(tasks []*proto.Task, taskID int64) {
require.Eventually(t, func() bool {
cnt, err := mgr.GetSubtaskInStatesCnt(taskID, proto.TaskStatePending)
require.NoError(t, err)
return int64(subtaskCnt) == cnt
}, time.Second, 50*time.Millisecond)
}

tasks := checkTaskRunningCnt()
checkSubtaskCnt(tasks, taskID)
for i := 1; i <= subtaskCnt; i++ {
err = mgr.UpdateSubtaskStateAndError(int64(i), proto.TaskStateSucceed, nil)
require.NoError(t, err)
}
dsp.DoCleanUpRoutine()
require.Eventually(t, func() bool {
tasks, err := mgr.GetGlobalTasksFromHistoryInStates(proto.TaskStateSucceed)
require.NoError(t, err)
return len(tasks) != 0
}, time.Second*10, time.Millisecond*300)
}

func TestCleanUpMeta(t *testing.T) {
store := testkit.CreateMockStore(t)
gtk := testkit.NewTestKit(t, store)
pool := pools.NewResourcePool(func() (pools.Resource, error) {
return gtk.Session(), nil
}, 1, 1, time.Second)
defer pool.Close()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockTaskMgr := mock.NewMockTaskManager(ctrl)
mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl)
dspMgr := MockDispatcherManagerWithMockTaskMgr(t, pool, mockTaskMgr)
dispatcher.RegisterDispatcherFactory(proto.TaskTypeExample,
func(ctx context.Context, taskMgr dispatcher.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
mockDispatcher := dspMgr.MockDispatcher(task)
mockDispatcher.Extension = &numberExampleDispatcherExt{}
return mockDispatcher
})
dispatcher.RegisterDispatcherCleanUpFactory(proto.TaskTypeExample,
func() dispatcher.CleanUpRoutine {
return mockCleanupRountine
})

mockTaskMgr.EXPECT().GetAllNodes().Return([]string{":4000", ":4001"}, nil)
mockTaskMgr.EXPECT().CleanUpMeta(gomock.Any()).Return(nil)
mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
require.Equal(t, 1, dspMgr.CleanUpMeta())

mockTaskMgr.EXPECT().GetAllNodes().Return([]string{":4000"}, nil)
mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
require.Equal(t, 0, dspMgr.CleanUpMeta())

mockTaskMgr.EXPECT().GetAllNodes().Return([]string{":4000", ":4001", ":4003"}, nil)
mockTaskMgr.EXPECT().CleanUpMeta(gomock.Any()).Return(nil)
mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
require.Equal(t, 2, dspMgr.CleanUpMeta())
}
Loading

0 comments on commit 3514d7f

Please sign in to comment.