From 4d242eca45721ebeadb1e4cc3ea29bdb5664c595 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 25 May 2022 16:30:25 +0800 Subject: [PATCH] support rebalancer --- dm/dm/master/scheduler/balancer.go | 161 +++++++++ dm/dm/master/scheduler/balancer_test.go | 453 ++++++++++++++++++++++++ dm/dm/master/scheduler/scheduler.go | 141 ++++++-- 3 files changed, 731 insertions(+), 24 deletions(-) create mode 100644 dm/dm/master/scheduler/balancer.go create mode 100644 dm/dm/master/scheduler/balancer_test.go diff --git a/dm/dm/master/scheduler/balancer.go b/dm/dm/master/scheduler/balancer.go new file mode 100644 index 00000000000..81700653412 --- /dev/null +++ b/dm/dm/master/scheduler/balancer.go @@ -0,0 +1,161 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "math" + "math/rand" + "sort" + "time" + + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/pkg/log" +) + +const ( + rebalanceInterval = 5 * time.Minute + hasLoadTaskWeight = 1e6 +) + +type balancer interface { + // FindVictims returns a set of possible victim tables. + // Removing these tables will make the workload more balanced. + FindVictims( + // if we want to support workload later, we can + totalWeight int, + workers map[string]*Worker, + relayWorkers map[string]map[string]struct{}, + hasLoadTaskByWorkerAndSource func(string, string) bool, + ) (sourcesToBalance []string) + // CanBalance returns true if the worker is balanced. + CanBalance(totalWeight int, workers map[string]*Worker, workerWeight int) bool + // GetWorkerBoundsByWeight returns the weight of the worker. + GetWorkerBoundsByWeight(w *Worker, relayWorkers map[string]map[string]struct{}, hasLoadTaskByWorkerAndSource func(string, string) bool) sourceHelper +} + +func newTableNumberBalancer(pLogger *log.Logger) *tableNumberBalancer { + return &tableNumberBalancer{ + logger: pLogger.WithFields(zap.String("component", "balancer")), + } +} + +type tableNumberBalancer struct { + logger log.Logger +} + +func (r *tableNumberBalancer) FindVictims( + sourceNumber int, + workers map[string]*Worker, + relayWorkers map[string]map[string]struct{}, + hasLoadTaskByWorkerAndSource func(string, string) bool, +) []string { + workerNum := 0 + for _, w := range workers { + if w.Stage() != WorkerOffline { + workerNum++ + } + } + + if workerNum == 0 { + return nil + } + upperLimitPerCapture := int(math.Ceil(float64(sourceNumber) / float64(workerNum))) + r.logger.Info("start rebalancing", + zap.Int("sourceNumber", sourceNumber), + zap.Int("workerNum", workerNum), + zap.Int("targetLimit", upperLimitPerCapture)) + + victims := make(sourceHelper, 0, len(workers)) + for _, w := range workers { + bounds := w.Bounds() + sourceNum2Remove := len(bounds) - upperLimitPerCapture + if sourceNum2Remove <= 0 || w.Stage() == WorkerOffline { + continue + } + + sourceList := r.GetWorkerBoundsByWeight(w, relayWorkers, hasLoadTaskByWorkerAndSource) + + // here we pick `sourceNum2Remove` tables to delete, + for _, record := range sourceList { + if sourceNum2Remove <= 0 || record.score >= hasLoadTaskWeight { + break + } + + r.logger.Info("find victim source", zap.String("source", record.source), zap.Float32("score", record.score)) + victims = append(victims, record) + sourceNum2Remove-- + } + } + + sort.Sort(victims) + victimSources := make([]string, 0, len(victims)) + for _, record := range victims { + victimSources = append(victimSources, record.source) + } + return victimSources +} + +func (r *tableNumberBalancer) GetWorkerBoundsByWeight(w *Worker, relayWorkers map[string]map[string]struct{}, hasLoadTaskByWorkerAndSource func(string, string) bool) sourceHelper { + relaySources := w.RelaySources() + bounds := w.Bounds() + + sourceList := make(sourceHelper, 0, len(bounds)) + for source := range bounds { + var score float32 + _, hasRelay := relaySources[source] + switch { + // don't rebalance the source that has load task + case hasLoadTaskByWorkerAndSource(w.BaseInfo().Name, source): + score = hasLoadTaskWeight + case hasRelay: + score = 100 - float32(len(relayWorkers[source])) + rand.Float32() + default: + score = rand.Float32() + } + sourceList = append(sourceList, sourceScore{score: score, source: source}) + } + sort.Sort(sourceList) + return sourceList +} + +func (r *tableNumberBalancer) CanBalance(sourceNumber int, workers map[string]*Worker, workerWeight int) bool { + workerNum := 0 + for _, w := range workers { + if w.Stage() != WorkerOffline { + workerNum++ + } + } + upperLimitPerCapture := int(math.Ceil(float64(sourceNumber) / float64(workerNum))) + return workerWeight <= upperLimitPerCapture +} + +type sourceScore struct { + source string + score float32 +} + +type sourceHelper []sourceScore + +func (s sourceHelper) Len() int { + return len(s) +} + +func (s sourceHelper) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s sourceHelper) Less(i, j int) bool { + return s[i].score < s[j].score +} diff --git a/dm/dm/master/scheduler/balancer_test.go b/dm/dm/master/scheduler/balancer_test.go new file mode 100644 index 00000000000..d4359d0f27c --- /dev/null +++ b/dm/dm/master/scheduler/balancer_test.go @@ -0,0 +1,453 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/integration" + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/pkg/ha" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/utils" +) + +func genSourceID(i int) string { + return fmt.Sprintf("mysql-replica-%d", i) +} + +func genSourceCfg(t *testing.T, i int) *config.SourceConfig { + sourceCfg, err := config.ParseYamlAndVerify(config.SampleSourceConfig) + require.NoError(t, err) + sourceCfg.SourceID = genSourceID(i) + return sourceCfg +} + +func genWorkerName(i int) string { + return fmt.Sprintf("dm-worker-%d", i) +} + +func genWorkerAddr(i int) string { + return fmt.Sprintf("127.0.0.1:%d", 31360+i) +} + +func genTaskName(i int) string { + return "task" + strconv.Itoa(i) +} + +func genSourceBounds(lSource, rSource, worker int) []ha.SourceBound { + bounds := make([]ha.SourceBound, rSource-lSource+1) + for i := lSource; i <= rSource; i++ { + bounds = append(bounds, ha.SourceBound{ + Source: genSourceID(i), + Worker: genWorkerName(worker), + }) + } + return bounds +} + +func TestBalancerSuite(t *testing.T) { + suite.Run(t, new(testBalancerSuite)) +} + +// clear keys in etcd test cluster. +func (t *testBalancerSuite) clearTestInfoOperation() { + t.T().Helper() + require.NoError(t.T(), ha.ClearTestInfoOperation(t.etcdTestCli)) +} + +type testBalancerSuite struct { + suite.Suite + mockCluster *integration.ClusterV3 + etcdTestCli *clientv3.Client +} + +func (t *testBalancerSuite) SetupSuite() { + require.NoError(t.T(), log.InitLogger(&log.Config{Level: "debug"})) + + integration.BeforeTestExternal(t.T()) + t.mockCluster = integration.NewClusterV3(t.T(), &integration.ClusterConfig{Size: 1}) + t.etcdTestCli = t.mockCluster.RandClient() + +} + +func (t *testBalancerSuite) TearDownSuite() { + t.mockCluster.Terminate(t.T()) +} +func (t *testBalancerSuite) TearDownTest() { + t.clearTestInfoOperation() +} + +func (t *testBalancerSuite) TestTableNumberBalancer() { + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + ctx, cancel = context.WithCancel(context.Background()) + wg = sync.WaitGroup{} + keepAliveTTL = int64(5) // NOTE: this should be >= minLeaseTTL, in second. + ) + cancels := make([]context.CancelFunc, 0, 4) + defer func() { + cancel() + for _, cancel1 := range cancels { + cancel1() + } + wg.Wait() + }() + + // 1. start scheduler and rebalancer + require.NoError(t.T(), s.Start(ctx, t.etcdTestCli)) + + sourceID := 1 + boundWorkers := func(wNumStart int, wNumEndBound int) { + for wNum := wNumStart; wNum <= 4; wNum++ { + ctx1, cancel1 := context.WithCancel(ctx) + require.NoError(t.T(), s.AddWorker(genWorkerName(wNum), genWorkerAddr(wNum))) + cancels = append(cancels, cancel1) + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t.T(), ha.KeepAlive(ctx1, t.etcdTestCli, genWorkerName(wNum), keepAliveTTL)) + }() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == wNum + })) + if wNum >= wNumEndBound { + continue + } + // bound 5,4,3 sources to dm-worker 1,2,3, and 0 sources to worker4 + for i := sourceID; i < sourceID+6-wNum; i++ { + err := s.AddSourceCfg(genSourceCfg(t.T(), i)) + require.NoError(t.T(), err) + } + sourceID += 6 - wNum + } + } + // 2. start dm-worker1 and bound 5,4,3 sources to dm-worker1,2,3 + boundWorkers(1, 4) + // 3. trigger a rebalance, and sources should be balanced like 3,3,3,3 + require.True(t.T(), s.TriggerRebalance()) + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(4) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 3 + })) + expectRange := [][]int{{1, 5}, {6, 9}, {10, 12}} + for wNum := 1; wNum <= 4; wNum++ { + // when using s.GetWorkerByName we will apply a read lock on scheduler. If we can access it successfully, + // it means the rebalance work is finished. + w := s.GetWorkerByName(genWorkerName(wNum)) + require.Len(t.T(), w.Bounds(), 3) + + rangeSourceNum := make([]int, 2) + for _, bound := range w.Bounds() { + sourceNumStr := strings.TrimLeft(bound.Source, "mysql-replica-") + sourceNum, err := strconv.Atoi(sourceNumStr) + require.NoError(t.T(), err) + if wNum <= 3 { + l, r := expectRange[wNum-1][0], expectRange[wNum-1][1] + require.GreaterOrEqual(t.T(), sourceNum, l) + require.LessOrEqual(t.T(), sourceNum, r) + } else { + for i := 0; i <= 1; i++ { + if sourceNum >= expectRange[i][0] && sourceNum <= expectRange[i][1] { + rangeSourceNum[i]++ + } + } + } + } + if wNum >= 4 { + require.Equal(t.T(), 2, rangeSourceNum[0]) + require.Equal(t.T(), 1, rangeSourceNum[1]) + } + } + + // 4. stop worker 4 and trigger rebalance, should become 4,4,4,0 + cancels[3]() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == 3 + })) + + require.True(t.T(), s.TriggerRebalance()) + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(3) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 4 + })) + for wNum := 1; wNum <= 3; wNum++ { + w := s.GetWorkerByName(genWorkerName(wNum)) + require.Len(t.T(), w.Bounds(), 4) + } + require.Len(t.T(), s.GetWorkerByName(genWorkerName(4)).Bounds(), 0) + + // 5. stop worker 3 and trigger rebalance, should become 6,6,0,0 + cancels[2]() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == 2 + })) + require.True(t.T(), s.TriggerRebalance()) + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(2) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 6 + })) + for wNum := 1; wNum <= 2; wNum++ { + w := s.GetWorkerByName(genWorkerName(wNum)) + require.Len(t.T(), w.Bounds(), 6) + } + require.Len(t.T(), s.GetWorkerByName(genWorkerName(3)).Bounds(), 0) + require.Len(t.T(), s.GetWorkerByName(genWorkerName(4)).Bounds(), 0) + + // 6. start these two workers again, should become 3,3,3,3 after rebalance + sourceID = 3 + cancels = cancels[:2] + boundWorkers(3, 3) + require.True(t.T(), s.TriggerRebalance()) + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(4) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 3 + })) + for wNum := 1; wNum <= 4; wNum++ { + w := s.GetWorkerByName(genWorkerName(wNum)) + require.Len(t.T(), w.Bounds(), 3) + } + s.Close() +} + +func (t *testBalancerSuite) TestTableNumberBalancerWithPrivileges() { + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + ctx, cancel = context.WithCancel(context.Background()) + wg = sync.WaitGroup{} + keepAliveTTL = int64(10) // NOTE: this should be >= minLeaseTTL, in second. + ) + cancels := make([]context.CancelFunc, 0, 4) + defer func() { + cancel() + for _, cancel1 := range cancels { + cancel1() + } + wg.Wait() + }() + + checkSources := func(expectBounds [][]int) { + for wNum := 1; wNum <= len(expectBounds); wNum++ { + // when using s.GetWorkerByName we will apply a read lock on scheduler. If we can access it successfully, + // it means the rebalance work is finished. + w := s.GetWorkerByName(genWorkerName(wNum)) + sources := expectBounds[wNum-1] + require.Len(t.T(), w.Bounds(), len(sources)) + if len(sources) == 0 { + continue + } + sourceIDs := make([]int, 0, len(sources)) + for _, bound := range w.Bounds() { + sourceNumStr := strings.TrimLeft(bound.Source, "mysql-replica-") + sourceID, err := strconv.Atoi(sourceNumStr) + require.NoError(t.T(), err) + sourceIDs = append(sourceIDs, sourceID) + } + sort.Ints(sourceIDs) + require.Equal(t.T(), sources, sourceIDs) + } + } + + // 1. start scheduler and rebalancer + require.NoError(t.T(), s.Start(ctx, t.etcdTestCli)) + + // 2. start 4 workers, bound sources, start load tasks and relay on these workers + sourceBoundInfo := [][]int{ + // sources to bound, sources with load tasks, sources with relay + {5, 2, 2}, + {5, 1, 3}, + {2, 0, 0}, + } + + sourceID := 1 + for wNum := 1; wNum <= 4; wNum++ { + ctx1, cancel1 := context.WithCancel(ctx) + require.NoError(t.T(), s.AddWorker(genWorkerName(wNum), genWorkerAddr(wNum))) + cancels = append(cancels, cancel1) + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t.T(), ha.KeepAlive(ctx1, t.etcdTestCli, genWorkerName(wNum), keepAliveTTL)) + }() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == wNum + })) + if wNum >= 4 { + continue + } + boundInfo := sourceBoundInfo[wNum-1] + sourcesToBound, sourcesLoadTask, sourcesRelay := boundInfo[0], boundInfo[1], boundInfo[2] + + // bound sources to dm-worker + for i := sourceID; i < sourceID+sourcesToBound; i++ { + err := s.AddSourceCfg(genSourceCfg(t.T(), i)) + require.NoError(t.T(), err) + } + + if sourcesLoadTask > 0 { + subtaskConfigs := make(map[string]config.SubTaskConfig) + sourceWorkerMap := make(map[string]string) + for i := sourceID; i < sourceID+sourcesLoadTask; i++ { + subtaskConfigs[genSourceID(i)] = config.SubTaskConfig{} + sourceWorkerMap[genSourceID(i)] = genWorkerName(wNum) + } + s.subTaskCfgs.Store(genTaskName(wNum), subtaskConfigs) + s.loadTasks[genTaskName(wNum)] = sourceWorkerMap + } + if sourcesRelay > 0 { + for i := sourceID; i < sourceID+sourcesLoadTask+sourcesRelay; i++ { + require.NoError(t.T(), s.StartRelay(genSourceID(i), []string{genWorkerName(wNum)})) + } + } + sourceID += sourcesToBound + } + + // 3. trigger a rebalance, and sources should be balanced like 4(2load 2relay),4(1load 3relay),2,2 + require.True(t.T(), s.TriggerRebalance()) + utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(4) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 2 + }) + require.Len(t.T(), s.GetWorkerByName(genWorkerName(4)).Bounds(), 2) + checkSources([][]int{ + {1, 2, 3, 4}, + {6, 7, 8, 9}, + {11, 12}, + {5, 10}, + }) + + // 4. trigger a rebalance, and sources should be balanced like 4(2load 2relay),4(1load 3relay),2,2 + require.NoError(t.T(), s.StartRelay(genSourceID(4), []string{genWorkerName(3)})) + require.NoError(t.T(), s.StartRelay(genSourceID(9), []string{genWorkerName(4)})) // 3. trigger a rebalance, and sources should be balanced like 4(2load 2relay),4(1load 3relay),2,2 + require.True(t.T(), s.TriggerRebalance()) + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(4) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 3 + })) + + checkSources([][]int{ + {1, 2, 3}, + {6, 7, 8}, + {4, 11, 12}, + {5, 9, 10}, + }) + + // 4. stop worker 4 and trigger rebalance, should become 4,4,4,0 + // make sure source 5 can be bound to worker 1 + require.NoError(t.T(), s.StartRelay(genSourceID(5), []string{genWorkerName(1)})) + cancels[3]() + cancels = cancels[:len(cancels)-1] + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == 3 + })) + s.TriggerRebalance() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(1) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + log.L().Info("get sbm", zap.Any("sbm", sbm)) + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 4 + })) + checkSources([][]int{ + {1, 2, 3, 5}, + {6, 7, 8, 9}, + {4, 10, 11, 12}, + {}, + }) + + ctx1, cancel1 := context.WithCancel(ctx) + cancels = append(cancels, cancel1) + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t.T(), ha.KeepAlive(ctx1, t.etcdTestCli, genWorkerName(4), keepAliveTTL)) + }() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(t.etcdTestCli) + return err == nil && len(kam) == 4 + })) + require.NoError(t.T(), s.StartRelay(genSourceID(10), []string{genWorkerName(3)})) + require.NoError(t.T(), s.StartRelay(genSourceID(11), []string{genWorkerName(3)})) + for wNum := 1; wNum <= 2; wNum++ { + subtaskConfigs := make(map[string]config.SubTaskConfig) + sourceWorkerMap := make(map[string]string) + for _, bound := range s.workers[genWorkerName(wNum)].Bounds() { + subtaskConfigs[bound.Source] = config.SubTaskConfig{} + sourceWorkerMap[bound.Source] = genWorkerName(wNum) + } + s.subTaskCfgs.Store(genTaskName(wNum), subtaskConfigs) + s.loadTasks[genTaskName(wNum)] = sourceWorkerMap + } + s.TriggerRebalance() + require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + workerName := genWorkerName(3) + sbm, _, err := ha.GetSourceBound(t.etcdTestCli, workerName, "") + log.L().Info("get sbm", zap.Any("sbm", sbm)) + if err != nil { + log.L().Error("fail to get source bounds from etcd", zap.Error(err)) + } + return len(sbm[workerName]) == 3 + })) + checkSources([][]int{ + {1, 2, 3, 5}, + {6, 7, 8, 9}, + {4, 10, 11}, + {12}, + }) +} diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 31855cb5ecf..cf63c1c7acf 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -183,6 +183,10 @@ type Scheduler struct { loadTasks map[string]map[string]string securityCfg config.Security + + balance balancer + + triggerRebalance chan struct{} } // NewScheduler creates a new scheduler instance. @@ -198,6 +202,8 @@ func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler { relayWorkers: make(map[string]map[string]struct{}), loadTasks: make(map[string]map[string]string), securityCfg: securityCfg, + balance: newTableNumberBalancer(pLogger), + triggerRebalance: make(chan struct{}, 0), } } @@ -278,6 +284,12 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e s.observeLoadTask(ctx, rev1) }(loadTaskRev) + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.periodicallyRebalance(ctx, rebalanceInterval) + }() + s.started.Store(true) // started now s.cancel = cancel s.logger.Info("the scheduler has started") @@ -551,7 +563,7 @@ func (s *Scheduler) transferWorkerAndSource(lworker, lsource, rworker, rsource s if inputWorkers[i] != "" { bounds := workers[i].Bounds() expect := inputSources[i] - if expect == "" && len(bounds) == 0 { + if expect == "" { continue } if _, ok := bounds[expect]; !ok { @@ -2149,11 +2161,12 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error { } // 5. unbound for the source. - unbounds := make([]string, 0, len(bounds)) - for _, bound := range bounds { - s.logger.Debug("unbound the worker for source", zap.Stringer("bound", bound), zap.Stringer("event", ev)) - s.updateStatusToUnbound(bound.Source) - unbounds = append(unbounds, bound.Source) + boundSourcesByWeight := s.balance.GetWorkerBoundsByWeight(w, s.relayWorkers, s.hasLoadTaskByWorkerAndSource) + unbounds := make([]string, 0, len(boundSourcesByWeight)) + for _, bound := range boundSourcesByWeight { + s.logger.Debug("unbound the worker for source", zap.String("source", bound.source), zap.Stringer("event", ev)) + s.updateStatusToUnbound(bound.source) + unbounds = append(unbounds, bound.source) } defer func() { // renew last bounds after we finish this round of operation @@ -2165,8 +2178,9 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error { // 6. change the stage (from Bound) to Offline. w.ToOffline() - // 7. try to bound the source to an online worker again. - for _, source := range unbounds { + // 7. try to bound the source to a Free worker again. + for i := len(unbounds) - 1; i >= 0; i-- { + source := unbounds[i] _, err = s.tryBoundForSource(source) if err != nil { s.logger.Warn("fail to bound new worker for source", zap.Error(err), zap.String("source", source), zap.Stringer("event", ev)) @@ -2219,8 +2233,6 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // caller should update the s.unbounds. // caller should make sure this source has source config. func (s *Scheduler) tryBoundForSource(source string) (bool, error) { - var worker *Worker - // TODO: change this to pick a worker which has the least load. // pick a worker which has subtask in load stage. workerName, sourceID := s.getNextLoadTaskTransfer("", source) @@ -2230,11 +2242,32 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) { return err == nil, err } + worker := s.pickBestWorkerForSource(source) + + if worker == nil { + s.logger.Info("no online worker exists for bound", zap.String("source", source)) + return false, nil + } + + // 2. try to bound them. + err := s.boundSourceToWorker(source, worker) + if err != nil { + return false, err + } + return true, nil +} + +// pickBestWorkerForSource will be used in two situations: +// 1. when a source is newly added, we need to pick a worker to bound this source. +// 2. when trying to rebalance, we need to pick another worker to bound this source. In this case we don't need to delete worker before, +// because it will help this source to find another worker. +func (s *Scheduler) pickBestWorkerForSource(source string) *Worker { + var worker *Worker + sourceNum := len(s.sourceCfgs) + 1 // try to find a history worker in relay workers... relayWorkers := s.relayWorkers[source] // TODO: use source number to pick a worker now. We may use task numbers/source workload as a factor to pick a worker in the future. - boundSourcesNum := sourceNum if len(relayWorkers) > 0 { if bound, ok := s.lastBound[source]; ok { boundWorker := bound.Worker @@ -2251,6 +2284,7 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) { } } if worker == nil { + boundSourcesNum := sourceNum // then a relay worker for this source... for workerName := range relayWorkers { w, ok := s.workers[workerName] @@ -2276,19 +2310,25 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) { if !ok { // a not found worker, should not happen s.logger.DPanic("worker instance not found for history worker", zap.String("worker", bound.Worker)) - return false, nil + return nil } if w.Stage() != WorkerOffline { worker = w s.logger.Info("found history worker when source bound", - zap.String("worker", workerName), + zap.String("worker", w.BaseInfo().Name), zap.String("source", source)) } } + + // if choose last bound will make this worker not balanced, we should pick another worker. + if worker != nil && !s.balance.CanBalance(len(s.sourceCfgs), s.workers, len(worker.Bounds())+1) { + worker = nil + } } if worker == nil { + boundSourcesNum := sourceNum // and then an online worker with lease sources. for _, w := range s.workers { if w.Stage() != WorkerOffline && len(w.Bounds()) < boundSourcesNum { @@ -2301,17 +2341,7 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) { } } - if worker == nil { - s.logger.Info("no online worker exists for bound", zap.String("source", source)) - return false, nil - } - - // 2. try to bound them. - err := s.boundSourceToWorker(source, worker) - if err != nil { - return false, err - } - return true, nil + return worker } // boundSourceToWorker bounds the source and worker together. @@ -2708,3 +2738,66 @@ func (s *Scheduler) OperateValidationTask(expectStage pb.Stage, stCfgs map[strin } return nil } + +func (s *Scheduler) doRebalanceJob() { + s.mu.Lock() + defer s.mu.Unlock() + + victims := s.balance.FindVictims(len(s.sourceCfgs), s.workers, s.relayWorkers, s.hasLoadTaskByWorkerAndSource) + for _, source := range victims { + w := s.pickBestWorkerForSource(source) + if w != nil { + lworker := "" + boundWorker, ok := s.bounds[source] + if ok { + lworker = boundWorker.BaseInfo().Name + } + if lworker == w.BaseInfo().Name { + continue + } + err := s.transferWorkerAndSource(lworker, source, w.BaseInfo().Name, "") + if err != nil { + s.logger.Warn("fail to transfer source during rebalance", zap.String("lworker", lworker), + zap.String("lsource", source), zap.String("rworker", w.BaseInfo().Name), zap.Error(err)) + } + } + } + + unbounds := s.getUnboundSources() + for _, source := range unbounds { + bound, err := s.tryBoundForSource(source) + if err != nil { + s.logger.Warn("fail to bound new worker for source during rebalance", zap.Error(err), zap.String("source", source)) + break + } + // means no alive worker is found + if !bound { + return + } + } +} + +func (s *Scheduler) periodicallyRebalance(ctx context.Context, interval time.Duration) { + tick := time.NewTicker(interval) + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + case <-s.triggerRebalance: + tick.Reset(interval) + } + s.doRebalanceJob() + } +} + +func (s *Scheduler) TriggerRebalance() bool { + select { + case s.triggerRebalance <- struct{}{}: + s.logger.Info("manual trigger rebalance succeeds") + return true + default: + s.logger.Info("one rebalance worker is already running now") + return false + } +}