diff --git a/cdc/scheduler/internal/info_provider.go b/cdc/scheduler/internal/info_provider.go index 1edf2bcb3a0..0d8bd0520e1 100644 --- a/cdc/scheduler/internal/info_provider.go +++ b/cdc/scheduler/internal/info_provider.go @@ -24,13 +24,22 @@ type InfoProvider interface { GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) // GetTaskPositions returns the task positions. + // + // TODO: Remove the API, as capture no longer has local checkpoint ts and + // local resolved ts in tpscheduler. GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) // GetTotalTableCounts returns the number of tables associated // with each capture. + // + // TODO: Remove the API, as the only usage is metrics which + // has already exported in the scheduler package. GetTotalTableCounts() map[model.CaptureID]int // GetPendingTableCounts returns the number of tables in a non-ready // status (Adding & Removing) associated with each capture. + // + // TODO: Remove the API, as the only usage is metrics which + // has already exported in the scheduler package. GetPendingTableCounts() map[model.CaptureID]int } diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 71a7ed72bde..911f3fc3465 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -15,6 +15,7 @@ package tp import ( "context" + "sync" "sync/atomic" "time" @@ -37,6 +38,10 @@ const ( var _ internal.Scheduler = (*coordinator)(nil) type coordinator struct { + // A mutex for concurrent access of coordinator in + // internal.Scheduler and internal.InfoProvider API. + mu sync.Mutex + version string revision schedulepb.OwnerRevision captureID model.CaptureID @@ -105,10 +110,16 @@ func (c *coordinator) Tick( // All captures that are alive according to the latest Etcd states. aliveCaptures map[model.CaptureID]*model.CaptureInfo, ) (newCheckpointTs, newResolvedTs model.Ts, err error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.poll(ctx, checkpointTs, currentTables, aliveCaptures) } func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.captureM.CheckAllCaptureInitialized() { log.Info("tpscheduler: manual move table task ignored, "+ "since not all captures initialized", @@ -133,6 +144,9 @@ func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) { } func (c *coordinator) Rebalance() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.captureM.CheckAllCaptureInitialized() { log.Info("tpscheduler: manual rebalance task ignored, " + "since not all captures initialized") @@ -150,6 +164,9 @@ func (c *coordinator) Rebalance() { } func (c *coordinator) Close(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + _ = c.trans.Close() } diff --git a/cdc/scheduler/internal/tp/info_provider.go b/cdc/scheduler/internal/tp/info_provider.go new file mode 100644 index 00000000000..f8be8934045 --- /dev/null +++ b/cdc/scheduler/internal/tp/info_provider.go @@ -0,0 +1,73 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/scheduler/internal" +) + +var _ internal.InfoProvider = (*coordinator)(nil) + +// GetTaskStatuses returns the task statuses. +func (c *coordinator) GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) { + c.mu.Lock() + defer c.mu.Unlock() + + tasks := make(map[model.CaptureID]*model.TaskStatus, len(c.captureM.Captures)) + for captureID, status := range c.captureM.Captures { + taskStatus := &model.TaskStatus{ + Tables: make(map[model.TableID]*model.TableReplicaInfo), + } + for _, s := range status.Tables { + taskStatus.Tables[s.TableID] = &model.TableReplicaInfo{ + StartTs: s.Checkpoint.CheckpointTs, + } + } + tasks[captureID] = taskStatus + } + return tasks, nil +} + +// GetTaskPositions returns the task positions. +func (c *coordinator) GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) { + c.mu.Lock() + defer c.mu.Unlock() + + p := &model.TaskPosition{} + pos := make(map[model.CaptureID]*model.TaskPosition, len(c.captureM.Captures)) + for captureID := range c.captureM.Captures { + pos[captureID] = p + } + return pos, nil +} + +// GetTotalTableCounts returns the number of tables associated +// with each capture. +func (c *coordinator) GetTotalTableCounts() map[model.CaptureID]int { + c.mu.Lock() + defer c.mu.Unlock() + + tables := make(map[model.CaptureID]int, len(c.captureM.Captures)) + for captureID, status := range c.captureM.Captures { + tables[captureID] = len(status.Tables) + } + return tables +} + +// GetPendingTableCounts returns the number of tables in a non-ready +// status (Adding & Removing) associated with each capture. +func (c *coordinator) GetPendingTableCounts() map[model.CaptureID]int { + return make(map[model.CaptureID]int) +} diff --git a/cdc/scheduler/internal/tp/info_provider_test.go b/cdc/scheduler/internal/tp/info_provider_test.go new file mode 100644 index 00000000000..fb3553d509b --- /dev/null +++ b/cdc/scheduler/internal/tp/info_provider_test.go @@ -0,0 +1,61 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "math" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestInfoProvider(t *testing.T) { + t.Parallel() + + coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{ + HeartbeatTick: math.MaxInt, + MaxTaskConcurrency: 1, + }) + coord.captureM.Captures = map[model.CaptureID]*CaptureStatus{ + "a": {Tables: []schedulepb.TableStatus{{ + TableID: 1, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, + }, { + TableID: 2, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, + }}}, + "b": {}, + } + + // Smoke testing + var ip internal.InfoProvider = coord + tasks, err := ip.GetTaskStatuses() + require.Nil(t, err) + require.EqualValues(t, map[model.CaptureID]*model.TaskStatus{ + "a": {Tables: map[model.TableID]*model.TableReplicaInfo{ + 1: {StartTs: 1}, + 2: {StartTs: 1}, + }}, + "b": {Tables: map[model.TableID]*model.TableReplicaInfo{}}, + }, tasks) + pos, err := ip.GetTaskPositions() + require.Nil(t, err) + require.Len(t, pos, 2) + require.Len(t, ip.GetTotalTableCounts(), 2) + require.Empty(t, ip.GetPendingTableCounts()) +}