Skip to content

Commit

Permalink
*: store db ID in model.TableInfo and move SchemaByTable method out f…
Browse files Browse the repository at this point in the history
…rom InfoSchema (#50917)

ref #50959
  • Loading branch information
tiancaiamao authored Feb 7, 2024
1 parent 489edc7 commit f5e0799
Show file tree
Hide file tree
Showing 27 changed files with 88 additions and 76 deletions.
4 changes: 2 additions & 2 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
schema, ok := infoschema.SchemaByTable(info, table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
Expand Down Expand Up @@ -135,7 +135,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
schema, ok := infoschema.SchemaByTable(info, table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
Expand Down
7 changes: 7 additions & 0 deletions pkg/ddl/db_rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ func renameTableTest(t *testing.T, sql string, isAlterTable bool) {
oldTblInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
oldTblID := oldTblInfo.Meta().ID
oldDBID := oldTblInfo.Meta().DBID
require.NotEqual(t, oldDBID, 0)

tk.MustExec("create database test1")
tk.MustExec("use test1")
tk.MustExec(fmt.Sprintf(sql, "test.t", "test1.t1"))
is = domain.GetDomain(ctx).InfoSchema()
newTblInfo, err := is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t1"))
require.NoError(t, err)
require.Equal(t, oldTblID, newTblInfo.Meta().ID)
require.NotEqual(t, newTblInfo.Meta().DBID, oldDBID)
require.NotEqual(t, newTblInfo.Meta().DBID, 0)
oldDBID = newTblInfo.Meta().DBID
tk.MustQuery("select * from t1").Check(testkit.Rows("1 1", "2 2"))
tk.MustExec("use test")

Expand All @@ -107,6 +113,7 @@ func renameTableTest(t *testing.T, sql string, isAlterTable bool) {
newTblInfo, err = is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t2"))
require.NoError(t, err)
require.Equal(t, oldTblID, newTblInfo.Meta().ID)
require.Equal(t, oldDBID, newTblInfo.Meta().DBID)
tk.MustQuery("select * from t2").Check(testkit.Rows("1 1", "2 2"))
isExist := is.TableExists(model.NewCIStr("test1"), model.NewCIStr("t1"))
require.False(t, isExist)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6568,7 +6568,7 @@ func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, a
return nil
}

db, ok := is.SchemaByTable(tbInfo)
db, ok := infoschema.SchemaByTable(is, tbInfo)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStack("Database of table `%s` does not exist.", tb.Meta().Name)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
domain_metrics "github.com/pingcap/tidb/pkg/domain/metrics"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *
} else {
tblInfo = tbl.Meta()
}
dbInfo, existed := is.SchemaByTable(tblInfo)
dbInfo, existed := infoschema.SchemaByTable(is, tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, results
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, is infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
filteredTasks []*analyzeTask
skippedTables []string
Expand Down Expand Up @@ -237,19 +237,19 @@ func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, inf
if _, ok := tidAndPidsMap[physicalTableID]; !ok {
if isLocked {
if tableID.IsPartitionTable() {
tbl, _, def := infoSchema.FindTableByPartitionID(tableID.PartitionID)
tbl, _, def := is.FindTableByPartitionID(tableID.PartitionID)
if def == nil {
logutil.BgLogger().Warn("Unknown partition ID in analyze task", zap.Int64("pid", tableID.PartitionID))
} else {
schema, _ := infoSchema.SchemaByTable(tbl.Meta())
schema, _ := infoschema.SchemaByTable(is, tbl.Meta())
skippedTables = append(skippedTables, fmt.Sprintf("%s.%s partition (%s)", schema.Name, tbl.Meta().Name.O, def.Name.O))
}
} else {
tbl, ok := infoSchema.TableByID(physicalTableID)
tbl, ok := is.TableByID(physicalTableID)
if !ok {
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", physicalTableID))
} else {
schema, _ := infoSchema.SchemaByTable(tbl.Meta())
schema, _ := infoschema.SchemaByTable(is, tbl.Meta())
skippedTables = append(skippedTables, fmt.Sprintf("%s.%s", schema.Name, tbl.Meta().Name.O))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (e *AnalyzeExec) newAnalyzeHandleGlobalStatsJob(key globalStatsKey) *statis
if !ok {
return nil
}
db, ok := is.SchemaByTable(table.Meta())
db, ok := infoschema.SchemaByTable(is, table.Meta())
if !ok {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//pkg/util/deadlockhistory",
"//pkg/util/domainutil",
"//pkg/util/execdetails",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mock",
"//pkg/util/sem",
Expand Down
52 changes: 20 additions & 32 deletions pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/mock"
)

Expand All @@ -41,7 +42,6 @@ type InfoSchema interface {
TableByName(schema, table model.CIStr) (table.Table, error)
TableExists(schema, table model.CIStr) bool
SchemaByID(id int64) (*model.DBInfo, bool)
SchemaByTable(tableInfo *model.TableInfo) (*model.DBInfo, bool)
PolicyByName(name model.CIStr) (*model.PolicyInfo, bool)
ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool)
TableByID(id int64) (table.Table, bool)
Expand Down Expand Up @@ -126,13 +126,14 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
result.resourceGroupMap = make(map[string]*model.ResourceGroupInfo)
result.ruleBundleMap = make(map[int64]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
dbInfo := &model.DBInfo{ID: 1, Name: model.NewCIStr("test"), Tables: tbList}
tableNames := &schemaTables{
dbInfo: dbInfo,
tables: make(map[string]table.Table),
}
result.schemaMap["test"] = tableNames
for _, tb := range tbList {
tb.DBID = dbInfo.ID
tbl := table.MockTableFromMeta(tb)
tableNames.tables[tb.Name.L] = tbl
bucketIdx := tableBucketIdx(tb.ID)
Expand All @@ -154,13 +155,14 @@ func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) Inf
result.resourceGroupMap = make(map[string]*model.ResourceGroupInfo)
result.ruleBundleMap = make(map[int64]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
dbInfo := &model.DBInfo{ID: 1, Name: model.NewCIStr("test"), Tables: tbList}
tableNames := &schemaTables{
dbInfo: dbInfo,
tables: make(map[string]table.Table),
}
result.schemaMap["test"] = tableNames
for _, tb := range tbList {
tb.DBID = dbInfo.ID
tbl := table.MockTableFromMeta(tb)
tableNames.tables[tb.Name.L] = tbl
bucketIdx := tableBucketIdx(tb.ID)
Expand Down Expand Up @@ -260,18 +262,12 @@ func (is *infoSchema) SchemaByID(id int64) (val *model.DBInfo, ok bool) {
return nil, false
}

func (is *infoSchema) SchemaByTable(tableInfo *model.TableInfo) (val *model.DBInfo, ok bool) {
// SchemaByTable get a table's schema name
func SchemaByTable(is InfoSchema, tableInfo *model.TableInfo) (val *model.DBInfo, ok bool) {
if tableInfo == nil {
return nil, false
}
for _, v := range is.schemaMap {
if tbl, ok := v.tables[tableInfo.Name.L]; ok {
if tbl.Meta().ID == tableInfo.ID {
return v.dbInfo, true
}
}
}
return nil, false
return is.SchemaByID(tableInfo.DBID)
}

func (is *infoSchema) TableByID(id int64) (val table.Table, ok bool) {
Expand Down Expand Up @@ -367,6 +363,7 @@ func init() {
infoSchemaTables := make([]*model.TableInfo, 0, len(tableNameToColumns))
for name, cols := range tableNameToColumns {
tableInfo := buildTableMeta(name, cols)
tableInfo.DBID = dbID
infoSchemaTables = append(infoSchemaTables, tableInfo)
var ok bool
tableInfo.ID, ok = tableIDMap[tableInfo.Name.O]
Expand Down Expand Up @@ -589,7 +586,6 @@ func (is *SessionTables) TableByID(id int64) (tbl table.Table, ok bool) {
// AddTable add a table
func (is *SessionTables) AddTable(db *model.DBInfo, tbl table.Table) error {
schemaTables := is.ensureSchema(db)

tblMeta := tbl.Meta()
if _, ok := schemaTables.tables[tblMeta.Name.L]; ok {
return ErrTableExists.GenWithStackByArgs(tblMeta.Name)
Expand All @@ -598,6 +594,7 @@ func (is *SessionTables) AddTable(db *model.DBInfo, tbl table.Table) error {
if _, ok := is.idx2table[tblMeta.ID]; ok {
return ErrTableExists.GenWithStackByArgs(tblMeta.Name)
}
intest.Assert(db.ID == tbl.Meta().DBID)

schemaTables.tables[tblMeta.Name.L] = tbl
is.idx2table[tblMeta.ID] = tbl
Expand Down Expand Up @@ -630,17 +627,11 @@ func (is *SessionTables) Count() int {
return len(is.idx2table)
}

// SchemaByTable get a table's schema name
func (is *SessionTables) SchemaByTable(tableInfo *model.TableInfo) (*model.DBInfo, bool) {
if tableInfo == nil {
return nil, false
}

// SchemaByID get a table's schema from the schema ID.
func (is *SessionTables) SchemaByID(id int64) (*model.DBInfo, bool) {
for _, v := range is.schemaMap {
if tbl, ok := v.tables[tableInfo.Name.L]; ok {
if tbl.Meta().ID == tableInfo.ID {
return v.dbInfo, true
}
if v.dbInfo.ID == id {
return v.dbInfo, true
}
}

Expand Down Expand Up @@ -713,25 +704,22 @@ func (ts *SessionExtendedInfoSchema) TableByID(id int64) (table.Table, bool) {
return ts.InfoSchema.TableByID(id)
}

// SchemaByTable implements InfoSchema.SchemaByTable, it returns a stale DBInfo even if it's dropped.
func (ts *SessionExtendedInfoSchema) SchemaByTable(tableInfo *model.TableInfo) (*model.DBInfo, bool) {
if tableInfo == nil {
return nil, false
}

// SchemaByID implements InfoSchema.SchemaByID, it returns a stale DBInfo even if it's dropped.
func (ts *SessionExtendedInfoSchema) SchemaByID(id int64) (*model.DBInfo, bool) {
if ts.LocalTemporaryTables != nil {
if db, ok := ts.LocalTemporaryTables.SchemaByTable(tableInfo); ok {
if db, ok := ts.LocalTemporaryTables.SchemaByID(id); ok {
return db, true
}
}

if ts.MdlTables != nil {
if tbl, ok := ts.MdlTables.SchemaByTable(tableInfo); ok {
if tbl, ok := ts.MdlTables.SchemaByID(id); ok {
return tbl, true
}
}

return ts.InfoSchema.SchemaByTable(tableInfo)
ret, ok := ts.InfoSchema.SchemaByID(id)
return ret, ok
}

// UpdateTableInfo implements InfoSchema.SchemaByTable.
Expand Down
26 changes: 15 additions & 11 deletions pkg/infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestBasic(t *testing.T) {
Tables: []*model.TableInfo{tblInfo},
State: model.StatePublic,
}
tblInfo.DBID = dbInfo.ID

dbInfos := []*model.DBInfo{dbInfo}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
Expand Down Expand Up @@ -150,12 +151,12 @@ func TestBasic(t *testing.T) {
require.False(t, ok)
require.Nil(t, schema)

schema, ok = is.SchemaByTable(tblInfo)
schema, ok = infoschema.SchemaByTable(is, tblInfo)
require.True(t, ok)
require.NotNil(t, schema)

noexistTblInfo := &model.TableInfo{ID: 12345, Name: tblInfo.Name}
schema, ok = is.SchemaByTable(noexistTblInfo)
schema, ok = infoschema.SchemaByTable(is, noexistTblInfo)
require.False(t, ok)
require.Nil(t, schema)

Expand Down Expand Up @@ -582,6 +583,7 @@ func TestLocalTemporaryTables(t *testing.T) {
Columns: []*model.ColumnInfo{colInfo},
Indices: []*model.IndexInfo{},
State: model.StatePublic,
DBID: schemaID,
}

allocs := autoid.NewAllocatorsFromTblInfo(dom, schemaID, tblInfo)
Expand Down Expand Up @@ -623,7 +625,7 @@ func TestLocalTemporaryTables(t *testing.T) {
}

assertSchemaByTable := func(sc *infoschema.SessionTables, db *model.DBInfo, tb *model.TableInfo) {
got, ok := sc.SchemaByTable(tb)
got, ok := sc.SchemaByID(tb.DBID)
if db == nil {
require.Nil(t, got)
require.False(t, ok)
Expand All @@ -641,7 +643,7 @@ func TestLocalTemporaryTables(t *testing.T) {
tb13 := createNewTable(db1.ID, "tb3", model.TempTableLocal)

// db1b has the same name with db1
db1b := createNewSchemaInfo("db1")
db1b := createNewSchemaInfo("db1b")
tb15 := createNewTable(db1b.ID, "tb5", model.TempTableLocal)
tb16 := createNewTable(db1b.ID, "tb6", model.TempTableLocal)
tb17 := createNewTable(db1b.ID, "tb7", model.TempTableLocal)
Expand Down Expand Up @@ -704,13 +706,15 @@ func TestLocalTemporaryTables(t *testing.T) {
require.True(t, infoschema.ErrTableExists.Equal(err))
err = sc.AddTable(db1b, tb11)
require.True(t, infoschema.ErrTableExists.Equal(err))
tb11.Meta().DBID = 0 // SchemaByTable will get incorrect result if not reset here.

// failed add has no effect
assertTableByName(sc, db1.Name.L, tb11.Meta().Name.L, db1, tb11)

// delete some tables
require.True(t, sc.RemoveTable(model.NewCIStr("db1"), model.NewCIStr("tb1")))
require.True(t, sc.RemoveTable(model.NewCIStr("Db2"), model.NewCIStr("tB2")))
tb22.Meta().DBID = 0 // SchemaByTable will get incorrect result if not reset here.
require.False(t, sc.RemoveTable(model.NewCIStr("db1"), model.NewCIStr("tbx")))
require.False(t, sc.RemoveTable(model.NewCIStr("dbx"), model.NewCIStr("tbx")))

Expand All @@ -735,7 +739,6 @@ func TestLocalTemporaryTables(t *testing.T) {
// test non exist table schemaByTable
assertSchemaByTable(sc, nil, tb11.Meta())
assertSchemaByTable(sc, nil, tb22.Meta())
assertSchemaByTable(sc, nil, nil)

// test SessionExtendedInfoSchema
dbTest := createNewSchemaInfo("test")
Expand Down Expand Up @@ -781,25 +784,26 @@ func TestLocalTemporaryTables(t *testing.T) {
require.Equal(t, tb12, tbl)

// test SchemaByTable
info, ok := is.SchemaByTable(normalTbTestA.Meta())
info, ok := is.SchemaByID(normalTbTestA.Meta().DBID)
require.True(t, ok)
require.Equal(t, dbTest.Name.L, info.Name.L)
info, ok = is.SchemaByTable(normalTbTestB.Meta())
info, ok = is.SchemaByID(normalTbTestB.Meta().DBID)
require.True(t, ok)
require.Equal(t, dbTest.Name.L, info.Name.L)
info, ok = is.SchemaByTable(tmpTbTestA.Meta())
info, ok = is.SchemaByID(tmpTbTestA.Meta().DBID)
require.True(t, ok)
require.Equal(t, dbTest.Name.L, info.Name.L)
// SchemaByTable also returns DBInfo when the schema is not in the infoSchema but the table is an existing tmp table.
info, ok = is.SchemaByTable(tb12.Meta())
info, ok = is.SchemaByID(tb12.Meta().DBID)
require.True(t, ok)
require.Equal(t, db1.Name.L, info.Name.L)
// SchemaByTable returns nil when the schema is not in the infoSchema and the table is an non-existing normal table.
info, ok = is.SchemaByTable(normalTbTestC.Meta())
normalTbTestC.Meta().DBID = 0 // normalTbTestC is not added to any db, reset the DBID to avoid misuse
info, ok = is.SchemaByID(normalTbTestC.Meta().DBID)
require.False(t, ok)
require.Nil(t, info)
// SchemaByTable returns nil when the schema is not in the infoSchema and the table is an non-existing tmp table.
info, ok = is.SchemaByTable(tb22.Meta())
info, ok = is.SchemaByID(tb22.Meta().DBID)
require.False(t, ok)
require.Nil(t, info)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/metrics_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
tableInfo := buildTableMeta(name, cols)
tableInfo.ID = tableID
tableInfo.Comment = def.Comment
tableInfo.DBID = dbID
tableID++
metricTables = append(metricTables, tableInfo)
tableInfo.MaxColumnID = int64(len(tableInfo.Columns))
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/perfschema/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func Init() {
for i, c := range meta.Columns {
c.ID = int64(i) + 1
}
meta.DBID = dbID
}
dbInfo := &model.DBInfo{
ID: dbID,
Expand Down
Loading

0 comments on commit f5e0799

Please sign in to comment.