From da705896076e946530fe50d736f6b0e87a0825c7 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 28 Oct 2021 14:55:50 +0800 Subject: [PATCH] scheduler_v2: implement the basic logic for the new scheduler --- cdc/scheduler/util/table_set.go | 137 +++++++++++++++++ cdc/scheduler/util/table_set_test.go | 217 +++++++++++++++++++++++++++ errors.toml | 5 + pkg/errors/errors.go | 3 +- 4 files changed, 361 insertions(+), 1 deletion(-) create mode 100644 cdc/scheduler/util/table_set.go create mode 100644 cdc/scheduler/util/table_set_test.go diff --git a/cdc/scheduler/util/table_set.go b/cdc/scheduler/util/table_set.go new file mode 100644 index 00000000000..6485b0da4b4 --- /dev/null +++ b/cdc/scheduler/util/table_set.go @@ -0,0 +1,137 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "github.com/pingcap/ticdc/cdc/model" +) + +// TableSet provides a data structure to store the tables' states for the +// scheduler. +type TableSet struct { + tableIDMap map[model.TableID]*TableRecord + captureIndex map[model.CaptureID]map[model.TableID]*TableRecord +} + +// TableRecord is a record to be inserted into TableSet. +type TableRecord struct { + TableID model.TableID + CaptureID model.CaptureID + Status TableStatus +} + +// TableStatus is a type representing the table's replication status. +// The meaning of its value can be defined by the user of TableSet. +type TableStatus int32 + +// NewTableSet creates a new TableSet. +func NewTableSet() *TableSet { + return &TableSet{ + tableIDMap: map[model.TableID]*TableRecord{}, + captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{}, + } +} + +// AddTableRecord inserts a new TableRecord. +// It returns true if it succeeds. Returns false if there is a duplicate. +func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) { + if _, ok := s.tableIDMap[record.TableID]; ok { + // duplicate tableID + return false + } + s.tableIDMap[record.TableID] = record + + captureIndexEntry := s.captureIndex[record.CaptureID] + if captureIndexEntry == nil { + captureIndexEntry = make(map[model.TableID]*TableRecord) + s.captureIndex[record.CaptureID] = captureIndexEntry + } + + captureIndexEntry[record.TableID] = record + return true +} + +// GetTableRecord tries to obtain a record with the specified tableID. +func (s *TableSet) GetTableRecord(tableID model.TableID) (*TableRecord, bool) { + ret, ok := s.tableIDMap[tableID] + return ret, ok +} + +// RemoveTableRecord removes the record with tableID. Returns false +// if none exists. +func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool { + record, ok := s.tableIDMap[tableID] + if !ok { + return false + } + delete(s.tableIDMap, record.TableID) + + captureIndexEntry, ok := s.captureIndex[record.CaptureID] + if !ok { + panic("unreachable") + } + delete(captureIndexEntry, record.TableID) + if len(captureIndexEntry) == 0 { + delete(s.captureIndex, record.CaptureID) + } + return true +} + +// RemoveTableRecordByCaptureID removes all table records associated with +// captureID. +func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) { + captureIndexEntry, ok := s.captureIndex[captureID] + if !ok { + return + } + + for tableID := range captureIndexEntry { + delete(s.tableIDMap, tableID) + } + delete(s.captureIndex, captureID) +} + +// CountTableByCaptureID counts the number of tables associated with the captureID. +func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int { + return len(s.captureIndex[captureID]) +} + +// GetDistinctCaptures counts distinct captures with tables. +func (s *TableSet) GetDistinctCaptures() []model.CaptureID { + var ret []model.CaptureID + for captureID := range s.captureIndex { + ret = append(ret, captureID) + } + return ret +} + +// GetAllTables returns all stored information on all tables. +func (s *TableSet) GetAllTables() map[model.TableID]*TableRecord { + return s.tableIDMap +} + +// GetAllTablesGroupedByCaptures returns all stored information grouped by associated CaptureID. +func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model.TableID]*TableRecord { + return s.captureIndex +} + +// CountTableByStatus counts the number of tables with the given status. +func (s *TableSet) CountTableByStatus(status TableStatus) (count int) { + for _, record := range s.tableIDMap { + if record.Status == status { + count++ + } + } + return +} diff --git a/cdc/scheduler/util/table_set_test.go b/cdc/scheduler/util/table_set_test.go new file mode 100644 index 00000000000..d05d231e25d --- /dev/null +++ b/cdc/scheduler/util/table_set_test.go @@ -0,0 +1,217 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/pingcap/ticdc/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestTableSetBasics(t *testing.T) { + ts := NewTableSet() + ok := ts.AddTableRecord(&TableRecord{ + TableID: 1, + CaptureID: "capture-1", + Status: 0, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 1, + CaptureID: "capture-2", + Status: 0, + }) + // Adding a duplicate table record should fail + require.False(t, ok) + + record, ok := ts.GetTableRecord(1) + require.True(t, ok) + require.Equal(t, &TableRecord{ + TableID: 1, + CaptureID: "capture-1", + Status: 0, + }, record) + + ok = ts.RemoveTableRecord(1) + require.True(t, ok) + + ok = ts.RemoveTableRecord(2) + require.False(t, ok) +} + +func TestTableSetCaptures(t *testing.T) { + ts := NewTableSet() + ok := ts.AddTableRecord(&TableRecord{ + TableID: 1, + CaptureID: "capture-1", + Status: 0, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 2, + CaptureID: "capture-1", + Status: 0, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 3, + CaptureID: "capture-2", + Status: 0, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 4, + CaptureID: "capture-2", + Status: 0, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 5, + CaptureID: "capture-3", + Status: 0, + }) + require.True(t, ok) + + require.Equal(t, 2, ts.CountTableByCaptureID("capture-1")) + require.Equal(t, 2, ts.CountTableByCaptureID("capture-2")) + require.Equal(t, 1, ts.CountTableByCaptureID("capture-3")) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 6, + CaptureID: "capture-3", + Status: 0, + }) + require.True(t, ok) + require.Equal(t, 2, ts.CountTableByCaptureID("capture-3")) + + captures := ts.GetDistinctCaptures() + require.Len(t, captures, 3) + require.Contains(t, captures, "capture-1") + require.Contains(t, captures, "capture-2") + require.Contains(t, captures, "capture-3") + + ok = ts.RemoveTableRecord(3) + require.True(t, ok) + ok = ts.RemoveTableRecord(4) + require.True(t, ok) + + captures = ts.GetDistinctCaptures() + require.Len(t, captures, 2) + require.Contains(t, captures, "capture-1") + require.Contains(t, captures, "capture-3") + + captureToTableMap := ts.GetAllTablesGroupedByCaptures() + require.Equal(t, map[model.CaptureID]map[model.TableID]*TableRecord{ + "capture-1": { + 1: &TableRecord{ + TableID: 1, + CaptureID: "capture-1", + Status: 0, + }, + 2: &TableRecord{ + TableID: 2, + CaptureID: "capture-1", + Status: 0, + }, + }, + "capture-3": { + 5: &TableRecord{ + TableID: 5, + CaptureID: "capture-3", + Status: 0, + }, + 6: &TableRecord{ + TableID: 6, + CaptureID: "capture-3", + Status: 0, + }, + }, + }, captureToTableMap) + + ts.RemoveTableRecordByCaptureID("capture-3") + _, ok = ts.GetTableRecord(5) + require.False(t, ok) + _, ok = ts.GetTableRecord(6) + require.False(t, ok) + + allTables := ts.GetAllTables() + require.Equal(t, map[model.TableID]*TableRecord{ + 1: { + TableID: 1, + CaptureID: "capture-1", + Status: 0, + }, + 2: { + TableID: 2, + CaptureID: "capture-1", + Status: 0, + }, + }, allTables) + + ok = ts.RemoveTableRecord(1) + require.True(t, ok) + ok = ts.RemoveTableRecord(2) + require.True(t, ok) + + captureToTableMap = ts.GetAllTablesGroupedByCaptures() + require.Len(t, captureToTableMap, 0) +} + +func TestCountTableByStatus(t *testing.T) { + ts := NewTableSet() + ok := ts.AddTableRecord(&TableRecord{ + TableID: 1, + CaptureID: "capture-1", + Status: 1, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 2, + CaptureID: "capture-1", + Status: 2, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 3, + CaptureID: "capture-2", + Status: 3, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 4, + CaptureID: "capture-2", + Status: 1, + }) + require.True(t, ok) + + ok = ts.AddTableRecord(&TableRecord{ + TableID: 5, + CaptureID: "capture-3", + Status: 2, + }) + require.True(t, ok) + + require.Equal(t, 2, ts.CountTableByStatus(1)) + require.Equal(t, 2, ts.CountTableByStatus(2)) + require.Equal(t, 1, ts.CountTableByStatus(3)) +} diff --git a/errors.toml b/errors.toml index d35160a82b3..20ad544212d 100755 --- a/errors.toml +++ b/errors.toml @@ -646,6 +646,11 @@ error = ''' prewrite not match, key: %s, start-ts: %d, commit-ts: %d, type: %s, optype: %s ''' +["CDC:ErrProcessorDuplicateOperations"] +error = ''' +table processor duplicate operation, table-id: %d +''' + ["CDC:ErrProcessorEtcdWatch"] error = ''' etcd watch returns error diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ef42086d356..d14c2dedfbe 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -245,7 +245,8 @@ var ( ErrSortDirLockError = errors.Normalize("error encountered when locking sort-dir", errors.RFCCodeText("ErrSortDirLockError")) // processor errors - ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely")) + ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely")) + ErrProcessorDuplicateOperations = errors.Normalize("table processor duplicate operation, table-id: %d", errors.RFCCodeText("CDC:ErrProcessorDuplicateOperations")) // owner errors ErrOwnerChangedUnexpectedly = errors.Normalize("owner changed unexpectedly", errors.RFCCodeText("CDC:ErrOwnerChangedUnexpectedly"))