Skip to content

Commit

Permalink
infoschema: refine common part for builder (#51411)
Browse files Browse the repository at this point in the history
ref #50959
  • Loading branch information
ywqzzy authored Mar 1, 2024
1 parent 9b42b9f commit da7cd99
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 73 deletions.
183 changes: 111 additions & 72 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,10 @@ type bundleInfoBuilder struct {
updatePartitions map[int64]any
}

func (b *bundleInfoBuilder) ensureMap() {
if b.updateTables == nil {
b.updateTables = make(map[int64]any)
}
if b.updatePartitions == nil {
b.updatePartitions = make(map[int64]any)
}
if b.updatePolicies == nil {
b.updatePolicies = make(map[int64]any)
}
func (b *bundleInfoBuilder) initBundleInfoBuilder() {
b.updateTables = make(map[int64]any)
b.updatePartitions = make(map[int64]any)
b.updatePolicies = make(map[int64]any)
}

func (b *bundleInfoBuilder) SetDeltaUpdateBundles() {
Expand All @@ -83,17 +77,14 @@ func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) {
}

func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) {
b.ensureMap()
b.updateTables[tblID] = struct{}{}
}

func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) {
b.ensureMap()
b.updatePartitions[partID] = struct{}{}
}

func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) {
b.ensureMap()
b.updatePolicies[policyID] = struct{}{}
}

Expand Down Expand Up @@ -467,15 +458,7 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in
return tblIDs, nil
}

func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
roDBInfo, ok := b.infoSchema.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}
dbInfo := b.getSchemaAndCopyIfNecessary(roDBInfo.Name.L)
var oldTableID, newTableID int64
func (b *Builder) getTableIDs(diff *model.SchemaDiff) (oldTableID, newTableID int64) {
switch diff.Type {
case model.ActionCreateSequence, model.ActionRecoverTable:
newTableID = diff.TableID
Expand All @@ -502,6 +485,10 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
oldTableID = diff.TableID
newTableID = diff.TableID
}
return
}

func (b *Builder) updateBundleForTableUpdate(diff *model.SchemaDiff, newTableID, oldTableID int64) {
// handle placement rule cache
switch diff.Type {
case model.ActionCreateTable:
Expand All @@ -516,11 +503,12 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
case model.ActionAlterTablePlacement:
b.markTableBundleShouldUpdate(newTableID)
}
b.copySortedTables(oldTableID, newTableID)
}

func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model.DBInfo, diff *model.SchemaDiff) ([]int64, autoid.Allocators, error) {
tblIDs := make([]int64, 0, 2)
var newAllocs autoid.Allocators
// We try to reuse the old allocator, so the cached auto ID can be reused.
var allocs autoid.Allocators
if tableIDIsValid(oldTableID) {
if oldTableID == newTableID &&
// For rename table, keep the old alloc.
Expand All @@ -536,14 +524,14 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
// which may have AutoID not connected to tableID
// TODO: can there be _tidb_rowid AutoID per partition?
oldAllocs, _ := allocByID(b.infoSchema, oldTableID)
allocs = filterAllocators(diff, oldAllocs)
newAllocs = filterAllocators(diff, oldAllocs)
}

tmpIDs := tblIDs
if (diff.Type == model.ActionRenameTable || diff.Type == model.ActionRenameTables) && diff.OldSchemaID != diff.SchemaID {
oldRoDBInfo, ok := b.infoSchema.SchemaByID(diff.OldSchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
return nil, newAllocs, ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.OldSchemaID),
)
}
Expand All @@ -558,6 +546,26 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
tblIDs = tmpIDs
}
}
return tblIDs, newAllocs, nil
}

func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
roDBInfo, ok := b.infoSchema.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}
dbInfo := b.getSchemaAndCopyIfNecessary(roDBInfo.Name.L)
oldTableID, newTableID := b.getTableIDs(diff)
b.updateBundleForTableUpdate(diff, newTableID, oldTableID)
b.copySortedTables(oldTableID, newTableID)

tblIDs, allocs, err := b.dropTableForUpdate(newTableID, oldTableID, dbInfo, diff)
if err != nil {
return nil, err
}

if tableIDIsValid(newTableID) {
// All types except DropTableOrView.
var err error
Expand Down Expand Up @@ -783,20 +791,7 @@ func (b *Builder) copySortedTablesBucket(bucketIdx int) {
b.infoSchema.sortedTablesBuckets[bucketIdx] = newSortedTables
}

func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
tblInfo, err := m.GetTable(dbInfo.ID, tableID)
if err != nil {
return nil, errors.Trace(err)
}
if tblInfo == nil {
// When we apply an old schema diff, the table may has been dropped already, so we need to fall back to
// full load.
return nil, ErrTableNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", dbInfo.ID),
fmt.Sprintf("(Table ID %d)", tableID),
)
}

func (b *Builder) updateBundleForCreateTable(tblInfo *model.TableInfo, tp model.ActionType) {
switch tp {
case model.ActionDropTablePartition:
case model.ActionTruncateTablePartition:
Expand All @@ -811,27 +806,10 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
}
}
}
}

if tp != model.ActionTruncateTablePartition {
affected = appendAffectedIDs(affected, tblInfo)
}

// Failpoint check whether tableInfo should be added to repairInfo.
// Typically used in repair table test to load mock `bad` tableInfo into repairInfo.
failpoint.Inject("repairFetchCreateTable", func(val failpoint.Value) {
if val.(bool) {
if domainutil.RepairInfo.InRepairMode() && tp != model.ActionRepairTable && domainutil.RepairInfo.CheckAndFetchRepairedTable(dbInfo, tblInfo) {
failpoint.Return(nil, nil)
}
}
})

ConvertCharsetCollateToLowerCaseIfNeed(tblInfo)
ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo)

if len(allocs.Allocs) == 0 {
allocs = autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo)
} else {
func (b *Builder) buildAllocsForCreateTable(tp model.ActionType, dbInfo *model.DBInfo, tblInfo *model.TableInfo, allocs autoid.Allocators) autoid.Allocators {
if len(allocs.Allocs) != 0 {
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)
switch tp {
case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache:
Expand All @@ -857,7 +835,46 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
allocs = allocs.Append(newAlloc)
}
}
return allocs
}
return autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo)
}

func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
tblInfo, err := m.GetTable(dbInfo.ID, tableID)
if err != nil {
return nil, errors.Trace(err)
}
if tblInfo == nil {
// When we apply an old schema diff, the table may has been dropped already, so we need to fall back to
// full load.
return nil, ErrTableNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", dbInfo.ID),
fmt.Sprintf("(Table ID %d)", tableID),
)
}

b.updateBundleForCreateTable(tblInfo, tp)

if tp != model.ActionTruncateTablePartition {
affected = appendAffectedIDs(affected, tblInfo)
}

// Failpoint check whether tableInfo should be added to repairInfo.
// Typically used in repair table test to load mock `bad` tableInfo into repairInfo.
failpoint.Inject("repairFetchCreateTable", func(val failpoint.Value) {
if val.(bool) {
if domainutil.RepairInfo.InRepairMode() && tp != model.ActionRepairTable && domainutil.RepairInfo.CheckAndFetchRepairedTable(dbInfo, tblInfo) {
failpoint.Return(nil, nil)
}
}
})

ConvertCharsetCollateToLowerCaseIfNeed(tblInfo)
ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo)

allocs = b.buildAllocsForCreateTable(tp, dbInfo, tblInfo, allocs)

tbl, err := b.tableFromMeta(allocs, tblInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -986,6 +1003,7 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) (*Builder, error)
} else {
oldIS = oldSchema.(*infoSchema)
}
b.initBundleInfoBuilder()
b.infoSchema.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemasMap(oldIS)
b.copyBundlesMap(oldIS)
Expand Down Expand Up @@ -1065,10 +1083,8 @@ func (b *Builder) getSchemaAndCopyIfNecessary(dbName string) *model.DBInfo {
return b.infoSchema.schemaMap[dbName].dbInfo
}

// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo, all placement rules, and schema version.
func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo, schemaVersion int64) (*Builder, error) {
func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) {
info := b.infoSchema
info.schemaMetaVersion = schemaVersion
// build the policies.
for _, policy := range policies {
info.setPolicy(policy)
Expand All @@ -1085,28 +1101,51 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, policies []*model.Pol
b.infoSchema.addReferredForeignKeys(di.Name, t)
}
}
}

for _, di := range dbInfos {
err := b.createSchemaTablesForDB(di, b.tableFromMeta, schemaVersion)
if err != nil {
return nil, errors.Trace(err)
}
}

func (b *Builder) initVirtualTables(schemaVersion int64) error {
// Initialize virtual tables.
for _, driver := range drivers {
err := b.createSchemaTablesForDB(driver.DBInfo, driver.TableFromMeta, schemaVersion)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}
return nil
}

func (b *Builder) sortAllTablesByID() {
// Sort all tables by `ID`
for _, v := range info.sortedTablesBuckets {
for _, v := range b.infoSchema.sortedTablesBuckets {
slices.SortFunc(v, func(a, b table.Table) int {
return cmp.Compare(a.Meta().ID, b.Meta().ID)
})
}
}

// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo, all placement rules, and schema version.
func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo, schemaVersion int64) (*Builder, error) {
info := b.infoSchema
info.schemaMetaVersion = schemaVersion

b.initBundleInfoBuilder()

b.initMisc(dbInfos, policies, resourceGroups)

for _, di := range dbInfos {
err := b.createSchemaTablesForDB(di, b.tableFromMeta, schemaVersion)
if err != nil {
return nil, errors.Trace(err)
}
}

err := b.initVirtualTables(schemaVersion)
if err != nil {
return nil, err
}

b.sortAllTablesByID()

return b, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util"
"github.com/scalalang2/golang-fifo"
fifo "github.com/scalalang2/golang-fifo"
"github.com/scalalang2/golang-fifo/sieve"
"github.com/tidwall/btree"
"golang.org/x/sync/singleflight"
Expand Down

0 comments on commit da7cd99

Please sign in to comment.