Skip to content

Commit

Permalink
scheduler_v2: implement the basic logic for the new scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix committed Nov 1, 2021
1 parent dc32dd6 commit da70589
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 1 deletion.
137 changes: 137 additions & 0 deletions cdc/scheduler/util/table_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"github.com/pingcap/ticdc/cdc/model"
)

// TableSet provides a data structure to store the tables' states for the
// scheduler.
type TableSet struct {
tableIDMap map[model.TableID]*TableRecord
captureIndex map[model.CaptureID]map[model.TableID]*TableRecord
}

// TableRecord is a record to be inserted into TableSet.
type TableRecord struct {
TableID model.TableID
CaptureID model.CaptureID
Status TableStatus
}

// TableStatus is a type representing the table's replication status.
// The meaning of its value can be defined by the user of TableSet.
type TableStatus int32

// NewTableSet creates a new TableSet.
func NewTableSet() *TableSet {
return &TableSet{
tableIDMap: map[model.TableID]*TableRecord{},
captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{},
}
}

// AddTableRecord inserts a new TableRecord.
// It returns true if it succeeds. Returns false if there is a duplicate.
func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) {
if _, ok := s.tableIDMap[record.TableID]; ok {
// duplicate tableID
return false
}
s.tableIDMap[record.TableID] = record

captureIndexEntry := s.captureIndex[record.CaptureID]
if captureIndexEntry == nil {
captureIndexEntry = make(map[model.TableID]*TableRecord)
s.captureIndex[record.CaptureID] = captureIndexEntry
}

captureIndexEntry[record.TableID] = record
return true
}

// GetTableRecord tries to obtain a record with the specified tableID.
func (s *TableSet) GetTableRecord(tableID model.TableID) (*TableRecord, bool) {
ret, ok := s.tableIDMap[tableID]
return ret, ok
}

// RemoveTableRecord removes the record with tableID. Returns false
// if none exists.
func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool {
record, ok := s.tableIDMap[tableID]
if !ok {
return false
}
delete(s.tableIDMap, record.TableID)

captureIndexEntry, ok := s.captureIndex[record.CaptureID]
if !ok {
panic("unreachable")
}
delete(captureIndexEntry, record.TableID)
if len(captureIndexEntry) == 0 {
delete(s.captureIndex, record.CaptureID)
}
return true
}

// RemoveTableRecordByCaptureID removes all table records associated with
// captureID.
func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) {
captureIndexEntry, ok := s.captureIndex[captureID]
if !ok {
return
}

for tableID := range captureIndexEntry {
delete(s.tableIDMap, tableID)
}
delete(s.captureIndex, captureID)
}

// CountTableByCaptureID counts the number of tables associated with the captureID.
func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int {
return len(s.captureIndex[captureID])
}

// GetDistinctCaptures counts distinct captures with tables.
func (s *TableSet) GetDistinctCaptures() []model.CaptureID {
var ret []model.CaptureID
for captureID := range s.captureIndex {
ret = append(ret, captureID)
}
return ret
}

// GetAllTables returns all stored information on all tables.
func (s *TableSet) GetAllTables() map[model.TableID]*TableRecord {
return s.tableIDMap
}

// GetAllTablesGroupedByCaptures returns all stored information grouped by associated CaptureID.
func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model.TableID]*TableRecord {
return s.captureIndex
}

// CountTableByStatus counts the number of tables with the given status.
func (s *TableSet) CountTableByStatus(status TableStatus) (count int) {
for _, record := range s.tableIDMap {
if record.Status == status {
count++
}
}
return
}
217 changes: 217 additions & 0 deletions cdc/scheduler/util/table_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"testing"

"github.com/pingcap/ticdc/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableSetBasics(t *testing.T) {
ts := NewTableSet()
ok := ts.AddTableRecord(&TableRecord{
TableID: 1,
CaptureID: "capture-1",
Status: 0,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 1,
CaptureID: "capture-2",
Status: 0,
})
// Adding a duplicate table record should fail
require.False(t, ok)

record, ok := ts.GetTableRecord(1)
require.True(t, ok)
require.Equal(t, &TableRecord{
TableID: 1,
CaptureID: "capture-1",
Status: 0,
}, record)

ok = ts.RemoveTableRecord(1)
require.True(t, ok)

ok = ts.RemoveTableRecord(2)
require.False(t, ok)
}

func TestTableSetCaptures(t *testing.T) {
ts := NewTableSet()
ok := ts.AddTableRecord(&TableRecord{
TableID: 1,
CaptureID: "capture-1",
Status: 0,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 2,
CaptureID: "capture-1",
Status: 0,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 3,
CaptureID: "capture-2",
Status: 0,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 4,
CaptureID: "capture-2",
Status: 0,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 5,
CaptureID: "capture-3",
Status: 0,
})
require.True(t, ok)

require.Equal(t, 2, ts.CountTableByCaptureID("capture-1"))
require.Equal(t, 2, ts.CountTableByCaptureID("capture-2"))
require.Equal(t, 1, ts.CountTableByCaptureID("capture-3"))

ok = ts.AddTableRecord(&TableRecord{
TableID: 6,
CaptureID: "capture-3",
Status: 0,
})
require.True(t, ok)
require.Equal(t, 2, ts.CountTableByCaptureID("capture-3"))

captures := ts.GetDistinctCaptures()
require.Len(t, captures, 3)
require.Contains(t, captures, "capture-1")
require.Contains(t, captures, "capture-2")
require.Contains(t, captures, "capture-3")

ok = ts.RemoveTableRecord(3)
require.True(t, ok)
ok = ts.RemoveTableRecord(4)
require.True(t, ok)

captures = ts.GetDistinctCaptures()
require.Len(t, captures, 2)
require.Contains(t, captures, "capture-1")
require.Contains(t, captures, "capture-3")

captureToTableMap := ts.GetAllTablesGroupedByCaptures()
require.Equal(t, map[model.CaptureID]map[model.TableID]*TableRecord{
"capture-1": {
1: &TableRecord{
TableID: 1,
CaptureID: "capture-1",
Status: 0,
},
2: &TableRecord{
TableID: 2,
CaptureID: "capture-1",
Status: 0,
},
},
"capture-3": {
5: &TableRecord{
TableID: 5,
CaptureID: "capture-3",
Status: 0,
},
6: &TableRecord{
TableID: 6,
CaptureID: "capture-3",
Status: 0,
},
},
}, captureToTableMap)

ts.RemoveTableRecordByCaptureID("capture-3")
_, ok = ts.GetTableRecord(5)
require.False(t, ok)
_, ok = ts.GetTableRecord(6)
require.False(t, ok)

allTables := ts.GetAllTables()
require.Equal(t, map[model.TableID]*TableRecord{
1: {
TableID: 1,
CaptureID: "capture-1",
Status: 0,
},
2: {
TableID: 2,
CaptureID: "capture-1",
Status: 0,
},
}, allTables)

ok = ts.RemoveTableRecord(1)
require.True(t, ok)
ok = ts.RemoveTableRecord(2)
require.True(t, ok)

captureToTableMap = ts.GetAllTablesGroupedByCaptures()
require.Len(t, captureToTableMap, 0)
}

func TestCountTableByStatus(t *testing.T) {
ts := NewTableSet()
ok := ts.AddTableRecord(&TableRecord{
TableID: 1,
CaptureID: "capture-1",
Status: 1,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 2,
CaptureID: "capture-1",
Status: 2,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 3,
CaptureID: "capture-2",
Status: 3,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 4,
CaptureID: "capture-2",
Status: 1,
})
require.True(t, ok)

ok = ts.AddTableRecord(&TableRecord{
TableID: 5,
CaptureID: "capture-3",
Status: 2,
})
require.True(t, ok)

require.Equal(t, 2, ts.CountTableByStatus(1))
require.Equal(t, 2, ts.CountTableByStatus(2))
require.Equal(t, 1, ts.CountTableByStatus(3))
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ error = '''
prewrite not match, key: %s, start-ts: %d, commit-ts: %d, type: %s, optype: %s
'''

["CDC:ErrProcessorDuplicateOperations"]
error = '''
table processor duplicate operation, table-id: %d
'''

["CDC:ErrProcessorEtcdWatch"]
error = '''
etcd watch returns error
Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ var (
ErrSortDirLockError = errors.Normalize("error encountered when locking sort-dir", errors.RFCCodeText("ErrSortDirLockError"))

// processor errors
ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely"))
ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely"))
ErrProcessorDuplicateOperations = errors.Normalize("table processor duplicate operation, table-id: %d", errors.RFCCodeText("CDC:ErrProcessorDuplicateOperations"))

// owner errors
ErrOwnerChangedUnexpectedly = errors.Normalize("owner changed unexpectedly", errors.RFCCodeText("CDC:ErrOwnerChangedUnexpectedly"))
Expand Down

0 comments on commit da70589

Please sign in to comment.