Skip to content

Commit

Permalink
schemaStorage: reduce memory usage and add unit tests (#1115) (#1127)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 26, 2020
1 parent b819812 commit 2c00498
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 27 deletions.
58 changes: 36 additions & 22 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type schemaSnapshot struct {
tables map[int64]*model.TableInfo
partitionTable map[int64]*model.TableInfo

// key is schemaID and value is tableIDs
tableInSchema map[int64][]int64

truncateTableID map[int64]struct{}
ineligibleTableID map[int64]struct{}

Expand Down Expand Up @@ -109,6 +112,7 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
tables: make(map[int64]*model.TableInfo),
partitionTable: make(map[int64]*model.TableInfo),

tableInSchema: make(map[int64][]int64),
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

Expand All @@ -131,9 +135,9 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
}
dbinfo.Tables = make([]*timodel.TableInfo, 0, len(tableInfos))
snap.tableInSchema[schemaID] = make([]int64, 0, len(tableInfos))
for _, tableInfo := range tableInfos {
dbinfo.Tables = append(dbinfo.Tables, tableInfo)
snap.tableInSchema[schemaID] = append(snap.tableInSchema[schemaID], tableInfo.ID)
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
Expand Down Expand Up @@ -223,13 +227,21 @@ func (s *schemaSnapshot) Clone() *schemaSnapshot {

tables := make(map[int64]*model.TableInfo, len(s.tables))
for k, v := range s.tables {
tables[k] = v.Clone()
tables[k] = v
}
clone.tables = tables

tableInSchema := make(map[int64][]int64, len(s.tableInSchema))
for k, v := range s.tableInSchema {
cloneV := make([]int64, len(v))
copy(cloneV, v)
tableInSchema[k] = cloneV
}
clone.tableInSchema = tableInSchema

partitionTable := make(map[int64]*model.TableInfo, len(s.partitionTable))
for k, v := range s.partitionTable {
partitionTable[k] = v.Clone()
partitionTable[k] = v
}
clone.partitionTable = partitionTable

Expand Down Expand Up @@ -349,23 +361,19 @@ func (s *schemaSnapshot) dropSchema(id int64) error {
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(id)
}

for _, table := range schema.Tables {
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
delete(s.partitionTable, partition.ID)
}
}
tableName := s.tables[table.ID].TableName
if pi := table.GetPartitionInfo(); pi != nil {
for _, tableID := range s.tableInSchema[id] {
tableName := s.tables[tableID].TableName
if pi := s.tables[tableID].GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
delete(s.partitionTable, partition.ID)
}
}
delete(s.tables, table.ID)
delete(s.tables, tableID)
delete(s.tableNameToID, tableName)
}

delete(s.schemas, id)
delete(s.tableInSchema, id)
delete(s.schemaNameToID, schema.Name.O)

return nil
Expand All @@ -378,13 +386,15 @@ func (s *schemaSnapshot) createSchema(db *timodel.DBInfo) error {

s.schemas[db.ID] = db.Clone()
s.schemaNameToID[db.Name.O] = db.ID
s.tableInSchema[db.ID] = []int64{}

log.Debug("create schema success, schema id", zap.String("name", db.Name.O), zap.Int64("id", db.ID))
return nil
}

func (s *schemaSnapshot) replaceSchema(db *timodel.DBInfo) error {
if _, ok := s.schemas[db.ID]; !ok {
_, ok := s.schemas[db.ID]
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStack("schema %s(%d) not found", db.Name, db.ID)
}
s.schemas[db.ID] = db.Clone()
Expand All @@ -397,15 +407,15 @@ func (s *schemaSnapshot) dropTable(id int64) error {
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id)
}
schema, ok := s.SchemaByTableID(id)
tableInSchema, ok := s.tableInSchema[table.SchemaID]
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table(%d)'s schema", id)
}

for i := range schema.Tables {
if schema.Tables[i].ID == id {
copy(schema.Tables[i:], schema.Tables[i+1:])
schema.Tables = schema.Tables[:len(schema.Tables)-1]
for i, tableID := range tableInSchema {
if tableID == id {
copy(tableInSchema[i:], tableInSchema[i+1:])
s.tableInSchema[table.SchemaID] = tableInSchema[:len(tableInSchema)-1]
break
}
}
Expand Down Expand Up @@ -473,12 +483,16 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", table.SchemaID)
}
tableInSchema, ok := s.tableInSchema[table.SchemaID]
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", table.SchemaID)
}
_, ok = s.tables[table.ID]
if ok {
return cerror.ErrSnapshotTableExists.GenWithStackByArgs(schema.Name, table.Name)
}

schema.Tables = append(schema.Tables, table.TableInfo)
tableInSchema = append(tableInSchema, table.ID)
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
Expand Down Expand Up @@ -530,7 +544,7 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
getWrapTableInfo := func(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
job.BinlogInfo.TableInfo.Clone())
job.BinlogInfo.TableInfo)
}
switch job.Type {
case timodel.ActionCreateSchema:
Expand Down
222 changes: 221 additions & 1 deletion cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@ package entry
import (
"context"
"fmt"
"sort"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/errors"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
tidbkv "github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -684,7 +691,7 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *check.C) {
dbInfo, ok := snap.SchemaByTableID(tableID)
c.Assert(ok, check.IsTrue)
c.Assert(dbInfo.Name.O, check.Equals, "test2")
c.Assert(len(dbInfo.Tables), check.Equals, 3)
c.Assert(len(snap.tableInSchema), check.Equals, 3)
}

func (t *schemaSuite) TestSnapshotClone(c *check.C) {
Expand Down Expand Up @@ -770,3 +777,216 @@ func (t *schemaSuite) TestExplicitTables(c *check.C) {
c.Assert(len(snap3.tables)-len(snap1.tables), check.Equals, 5)
c.Assert(snap3.ineligibleTableID, check.HasLen, 0)
}

/*
TODO: Untested Action:
ActionAddForeignKey ActionType = 9
ActionDropForeignKey ActionType = 10
ActionRebaseAutoID ActionType = 13
ActionShardRowID ActionType = 16
ActionLockTable ActionType = 27
ActionUnlockTable ActionType = 28
ActionRepairTable ActionType = 29
ActionSetTiFlashReplica ActionType = 30
ActionUpdateTiFlashReplicaStatus ActionType = 31
ActionCreateSequence ActionType = 34
ActionAlterSequence ActionType = 35
ActionDropSequence ActionType = 36
ActionModifyTableAutoIdCache ActionType = 39
ActionRebaseAutoRandomBase ActionType = 40
ActionExchangeTablePartition ActionType = 42
ActionAddCheckConstraint ActionType = 43
ActionDropCheckConstraint ActionType = 44
ActionAlterCheckConstraint ActionType = 45
ActionAlterTableAlterPartition ActionType = 46
... Any Action which of value is greater than 46 ...
*/
func (t *schemaSuite) TestSchemaStorage(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.Background()
testCases := [][]string{{
"create database test_ddl1", // ActionCreateSchema
"create table test_ddl1.simple_test1 (id bigint primary key)", // ActionCreateTable
"create table test_ddl1.simple_test2 (id bigint)", // ActionCreateTable
"create table test_ddl1.simple_test3 (id bigint primary key)", // ActionCreateTable
"create table test_ddl1.simple_test4 (id bigint primary key)", // ActionCreateTable
"DROP TABLE test_ddl1.simple_test3", // ActionDropTable
"ALTER TABLE test_ddl1.simple_test1 ADD COLUMN c1 INT NOT NULL", // ActionAddColumn
"ALTER TABLE test_ddl1.simple_test1 ADD c2 INT NOT NULL AFTER id", // ActionAddColumn
"ALTER TABLE test_ddl1.simple_test1 ADD c3 INT NOT NULL", // ActionAddColumns
"ALTER TABLE test_ddl1.simple_test1 ADD c4 INT NOT NULL", // ActionAddColumns
"ALTER TABLE test_ddl1.simple_test1 DROP c1", // ActionDropColumn
"ALTER TABLE test_ddl1.simple_test1 DROP c2", // ActionDropColumns
"ALTER TABLE test_ddl1.simple_test1 DROP c3", // ActionDropColumns
"ALTER TABLE test_ddl1.simple_test1 ADD INDEX (c4)", // ActionAddIndex
"ALTER TABLE test_ddl1.simple_test1 DROP INDEX c4", // ActionDropIndex
"TRUNCATE test_ddl1.simple_test1", // ActionTruncateTable
"ALTER DATABASE test_ddl1 CHARACTER SET = binary COLLATE binary", // ActionModifySchemaCharsetAndCollate
"ALTER TABLE test_ddl1.simple_test2 ADD c1 INT NOT NULL", // ActionAddColumns
"ALTER TABLE test_ddl1.simple_test2 ADD c2 INT NOT NULL", // ActionAddColumns
"ALTER TABLE test_ddl1.simple_test2 ADD INDEX (c1)", // ActionAddIndex
"ALTER TABLE test_ddl1.simple_test2 ALTER INDEX c1 INVISIBLE", // ActionAlterIndexVisibility
"ALTER TABLE test_ddl1.simple_test2 RENAME INDEX c1 TO idx_c1", // ActionRenameIndex
"ALTER TABLE test_ddl1.simple_test2 MODIFY c2 BIGINT", // ActionModifyColumn
"CREATE VIEW test_ddl1.view_test2 AS SELECT * FROM test_ddl1.simple_test2 WHERE id > 2", // ActionCreateView
"DROP VIEW test_ddl1.view_test2", // ActionDropView
"RENAME TABLE test_ddl1.simple_test2 TO test_ddl1.simple_test5", // ActionRenameTable
"DROP DATABASE test_ddl1", // ActionDropSchema
"create database test_ddl2", // ActionCreateSchema
"create table test_ddl2.simple_test1 (id bigint primary key, c1 int not null unique key)", // ActionCreateTable
`CREATE TABLE test_ddl2.employees (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
fname VARCHAR(25) NOT NULL,
lname VARCHAR(25) NOT NULL,
store_id INT NOT NULL,
department_id INT NOT NULL
)
PARTITION BY RANGE(id) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (15),
PARTITION p3 VALUES LESS THAN (20)
)`, // ActionCreateTable
"ALTER TABLE test_ddl2.employees DROP PARTITION p2", // ActionDropTablePartition
"ALTER TABLE test_ddl2.employees ADD PARTITION (PARTITION p4 VALUES LESS THAN (25))", //ActionAddTablePartition
"ALTER TABLE test_ddl2.employees TRUNCATE PARTITION p3", // ActionTruncateTablePartition
"alter table test_ddl2.employees comment='modify comment'", // ActionModifyTableComment
"alter table test_ddl2.simple_test1 drop primary key", // ActionDropPrimaryKey
"alter table test_ddl2.simple_test1 add primary key pk(id)", // ActionAddPrimaryKey
"ALTER TABLE test_ddl2.simple_test1 ALTER id SET DEFAULT 18", // ActionSetDefaultValue
"ALTER TABLE test_ddl2.simple_test1 CHARACTER SET = utf8mb4", // ActionModifyTableCharsetAndCollate
// "recover table test_ddl2.employees", // ActionRecoverTable this ddl can't work on mock tikv

"DROP TABLE test_ddl2.employees",
`CREATE TABLE test_ddl2.employees2 (
id INT NOT NULL,
fname VARCHAR(25) NOT NULL,
lname VARCHAR(25) NOT NULL,
store_id INT NOT NULL,
department_id INT NOT NULL
)
PARTITION BY RANGE(id) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (15),
PARTITION p3 VALUES LESS THAN (20)
)`,
"ALTER TABLE test_ddl2.employees2 CHARACTER SET = utf8mb4",
"DROP DATABASE test_ddl2",
}}

checkSnapsEquals := func(snapA *schemaSnapshot, snapB *schemaSnapshot) {
c.Assert(snapA, check.DeepEquals, snapB,
check.Commentf("%s", cmp.Diff(snapA, snapB, cmp.AllowUnexported(schemaSnapshot{}, model.TableInfo{}))))
}

testOneGroup := func(tc []string) {
store, err := mockstore.NewMockTikvStore()
c.Assert(err, check.IsNil)
defer store.Close() //nolint:errcheck
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
conf.AlterPrimaryKey = true
})
session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, check.IsNil)
defer domain.Close()
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)

for _, ddlSQL := range tc {
tk.MustExec(ddlSQL)
}

jobs, err := getAllHistoryDDLJob(store)
c.Assert(err, check.IsNil)
scheamStorage, err := NewSchemaStorage(nil, 0, nil, false)
c.Assert(err, check.IsNil)
for _, job := range jobs {
err := scheamStorage.HandleDDLJob(job)
c.Assert(err, check.IsNil)
}

for _, job := range jobs {
ts := job.BinlogInfo.FinishedTS
meta, err := kv.GetSnapshotMeta(store, ts)
c.Assert(err, check.IsNil)
snapFromMeta, err := newSchemaSnapshotFromMeta(meta, ts, false)
c.Assert(err, check.IsNil)
snapFromSchemaStore, err := scheamStorage.GetSnapshot(ctx, ts)
c.Assert(err, check.IsNil)

tidySchemaSnapshot(snapFromMeta)
tidySchemaSnapshot(snapFromSchemaStore)
// check if the two snapshot are equal.
checkSnapsEquals(snapFromMeta, snapFromSchemaStore)
}
}

for _, tc := range testCases {
testOneGroup(tc)
}
}

func tidySchemaSnapshot(snap *schemaSnapshot) {
for _, dbInfo := range snap.schemas {
if len(dbInfo.Tables) == 0 {
dbInfo.Tables = nil
}
}
for _, tableInfo := range snap.tables {
tableInfo.TableInfoVersion = 0
if len(tableInfo.Columns) == 0 {
tableInfo.Columns = nil
}
if len(tableInfo.Indices) == 0 {
tableInfo.Indices = nil
}
if len(tableInfo.ForeignKeys) == 0 {
tableInfo.ForeignKeys = nil
}
}
// the snapshot from meta doesn't know which ineligible tables that have existed in history
// so we delete the ineligible tables which are already not exist
for tableID := range snap.ineligibleTableID {
if _, ok := snap.tables[tableID]; !ok {
delete(snap.ineligibleTableID, tableID)
}
}
// the snapshot from meta doesn't know which tables are truncated, so we just ignore it
snap.truncateTableID = nil
for _, v := range snap.tableInSchema {
sort.Slice(v, func(i, j int) bool { return v[i] < v[j] })
}

}

func getAllHistoryDDLJob(storage tidbkv.Storage) ([]*timodel.Job, error) {
s, err := session.CreateSession(storage)
if err != nil {
return nil, errors.Trace(err)
}

if s != nil {
defer s.Close()
}

store := domain.GetDomain(s.(sessionctx.Context)).Store()
txn, err := store.Begin()

if err != nil {
return nil, errors.Trace(err)
}
txnMeta := timeta.NewMeta(txn)

jobs, err := txnMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
return jobs, nil
}
Loading

0 comments on commit 2c00498

Please sign in to comment.