diff --git a/infoschema/info_store.go b/infoschema/info_store.go new file mode 100644 index 0000000000000..9ffd78e64fa2c --- /dev/null +++ b/infoschema/info_store.go @@ -0,0 +1,160 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema + +import ( + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" +) + +// InfoStore is a simple structure that stores DBInfo and TableInfo. It's modifiable and not thread-safe. +type InfoStore struct { + lowerCaseTableNames int // same as variable lower_case_table_names + + dbs map[string]*model.DBInfo + tables map[string]map[string]*model.TableInfo +} + +// NewInfoStore creates a InfoStore. +func NewInfoStore(lowerCaseTableNames int) *InfoStore { + return &InfoStore{ + lowerCaseTableNames: lowerCaseTableNames, + dbs: map[string]*model.DBInfo{}, + tables: map[string]map[string]*model.TableInfo{}, + } +} + +func (i *InfoStore) ciStr2Key(name model.CIStr) string { + if i.lowerCaseTableNames == 0 { + return name.O + } + return name.L +} + +// SchemaByName returns the DBInfo of given name. nil if not found. +func (i *InfoStore) SchemaByName(name model.CIStr) *model.DBInfo { + key := i.ciStr2Key(name) + return i.dbs[key] +} + +// PutSchema puts a DBInfo, it will overwrite the old one. +func (i *InfoStore) PutSchema(dbInfo *model.DBInfo) { + key := i.ciStr2Key(dbInfo.Name) + i.dbs[key] = dbInfo + if i.tables[key] == nil { + i.tables[key] = map[string]*model.TableInfo{} + } +} + +// DeleteSchema deletes the schema from InfoSchema. Returns true when the schema exists, false otherwise. +func (i *InfoStore) DeleteSchema(name model.CIStr) bool { + key := i.ciStr2Key(name) + _, ok := i.dbs[key] + if !ok { + return false + } + delete(i.dbs, key) + delete(i.tables, key) + return true +} + +// TableByName returns the TableInfo. It will also return the error like an infoschema. +func (i *InfoStore) TableByName(schema, table model.CIStr) (*model.TableInfo, error) { + schemaKey := i.ciStr2Key(schema) + tables, ok := i.tables[schemaKey] + if !ok { + return nil, ErrDatabaseNotExists.GenWithStackByArgs(schema) + } + + tableKey := i.ciStr2Key(table) + tbl, ok := tables[tableKey] + if !ok { + return nil, ErrTableNotExists.GenWithStackByArgs(schema, table) + } + return tbl, nil +} + +// PutTable puts a TableInfo, it will overwrite the old one. If the schema doesn't exist, it will return ErrDatabaseNotExists. +func (i *InfoStore) PutTable(schemaName model.CIStr, tblInfo *model.TableInfo) error { + schemaKey := i.ciStr2Key(schemaName) + tables, ok := i.tables[schemaKey] + if !ok { + return ErrDatabaseNotExists.GenWithStackByArgs(schemaName) + } + tableKey := i.ciStr2Key(tblInfo.Name) + tables[tableKey] = tblInfo + return nil +} + +// DeleteTable deletes the TableInfo, it will return ErrDatabaseNotExists or ErrTableNotExists when schema or table does +// not exist. +func (i *InfoStore) DeleteTable(schema, table model.CIStr) error { + schemaKey := i.ciStr2Key(schema) + tables, ok := i.tables[schemaKey] + if !ok { + return ErrDatabaseNotExists.GenWithStackByArgs(schema) + } + + tableKey := i.ciStr2Key(table) + _, ok = tables[tableKey] + if !ok { + return ErrTableNotExists.GenWithStackByArgs(schema, table) + } + delete(tables, tableKey) + return nil +} + +// InfoStoreAdaptor convert InfoStore to InfoSchema, it only implements a part of InfoSchema interface to be +// used by DDL interface. +// nolint:unused +type InfoStoreAdaptor struct { + InfoSchema + inner *InfoStore +} + +// SchemaByName implements the InfoSchema interface. +// nolint:unused +func (i InfoStoreAdaptor) SchemaByName(schema model.CIStr) (*model.DBInfo, bool) { + dbInfo := i.inner.SchemaByName(schema) + return dbInfo, dbInfo != nil +} + +// TableExists implements the InfoSchema interface. +// nolint:unused +func (i InfoStoreAdaptor) TableExists(schema, table model.CIStr) bool { + tableInfo, _ := i.inner.TableByName(schema, table) + return tableInfo != nil +} + +// TableIsView implements the InfoSchema interface. +// nolint:unused +func (i InfoStoreAdaptor) TableIsView(schema, table model.CIStr) bool { + tableInfo, _ := i.inner.TableByName(schema, table) + if tableInfo == nil { + return false + } + return tableInfo.IsView() +} + +// TableByName implements the InfoSchema interface. +// nolint:unused +func (i InfoStoreAdaptor) TableByName(schema, table model.CIStr) (t table.Table, err error) { + tableInfo, err := i.inner.TableByName(schema, table) + if err != nil { + return nil, err + } + return tables.MockTableFromMeta(tableInfo), nil +} diff --git a/infoschema/info_store_test.go b/infoschema/info_store_test.go new file mode 100644 index 0000000000000..07ee9d87d9de5 --- /dev/null +++ b/infoschema/info_store_test.go @@ -0,0 +1,115 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema + +import ( + "testing" + + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" +) + +func TestInfoStoreLowerCaseTableNames(t *testing.T) { + dbName := model.NewCIStr("DBName") + lowerDBName := model.NewCIStr("dbname") + tableName := model.NewCIStr("TableName") + lowerTableName := model.NewCIStr("tablename") + dbInfo := &model.DBInfo{Name: dbName} + tableInfo := &model.TableInfo{Name: tableName} + + // case-sensitive + + is := NewInfoStore(0) + is.PutSchema(dbInfo) + got := is.SchemaByName(dbName) + require.NotNil(t, got) + got = is.SchemaByName(lowerDBName) + require.Nil(t, got) + + err := is.PutTable(lowerDBName, tableInfo) + require.True(t, ErrDatabaseNotExists.Equal(err)) + err = is.PutTable(dbName, tableInfo) + require.NoError(t, err) + got2, err := is.TableByName(dbName, tableName) + require.NoError(t, err) + require.NotNil(t, got2) + got2, err = is.TableByName(lowerTableName, tableName) + require.True(t, ErrDatabaseNotExists.Equal(err)) + require.Nil(t, got2) + got2, err = is.TableByName(dbName, lowerTableName) + require.True(t, ErrTableNotExists.Equal(err)) + require.Nil(t, got2) + + // compare-insensitive + + is = NewInfoStore(2) + is.PutSchema(dbInfo) + got = is.SchemaByName(dbName) + require.NotNil(t, got) + got = is.SchemaByName(lowerDBName) + require.NotNil(t, got) + require.Equal(t, dbName, got.Name) + + err = is.PutTable(lowerDBName, tableInfo) + require.NoError(t, err) + got2, err = is.TableByName(dbName, tableName) + require.NoError(t, err) + require.NotNil(t, got2) + got2, err = is.TableByName(dbName, lowerTableName) + require.NoError(t, err) + require.NotNil(t, got2) + require.Equal(t, tableName, got2.Name) +} + +func TestInfoStoreDeleteTables(t *testing.T) { + is := NewInfoStore(0) + dbName1 := model.NewCIStr("DBName1") + dbName2 := model.NewCIStr("DBName2") + tableName1 := model.NewCIStr("TableName1") + tableName2 := model.NewCIStr("TableName2") + dbInfo1 := &model.DBInfo{Name: dbName1} + dbInfo2 := &model.DBInfo{Name: dbName2} + tableInfo1 := &model.TableInfo{Name: tableName1} + tableInfo2 := &model.TableInfo{Name: tableName2} + + is.PutSchema(dbInfo1) + err := is.PutTable(dbName1, tableInfo1) + require.NoError(t, err) + err = is.PutTable(dbName1, tableInfo2) + require.NoError(t, err) + + // db2 not created + ok := is.DeleteSchema(dbName2) + require.False(t, ok) + err = is.PutTable(dbName2, tableInfo1) + require.True(t, ErrDatabaseNotExists.Equal(err)) + err = is.DeleteTable(dbName2, tableName1) + require.True(t, ErrDatabaseNotExists.Equal(err)) + + is.PutSchema(dbInfo2) + err = is.PutTable(dbName2, tableInfo1) + require.NoError(t, err) + + err = is.DeleteTable(dbName2, tableName2) + require.True(t, ErrTableNotExists.Equal(err)) + err = is.DeleteTable(dbName2, tableName1) + require.NoError(t, err) + + // delete db will remove its tables + ok = is.DeleteSchema(dbName1) + require.True(t, ok) + _, err = is.TableByName(dbName1, tableName1) + require.True(t, ErrDatabaseNotExists.Equal(err)) +}