diff --git a/pkg/infoschema/BUILD.bazel b/pkg/infoschema/BUILD.bazel index 609b90884f5b4..d703592636a5b 100644 --- a/pkg/infoschema/BUILD.bazel +++ b/pkg/infoschema/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "infoschema", srcs = [ "builder.go", + "builder_misc.go", + "bundle_builder.go", "cache.go", "cluster.go", "error.go", diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 9df8319aeef66..badfc00139aec 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -36,137 +36,9 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/util/domainutil" - "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlexec" - "go.uber.org/zap" ) -type policyGetter struct { - is *infoSchema -} - -func (p *policyGetter) GetPolicy(policyID int64) (*model.PolicyInfo, error) { - if policy, ok := p.is.PolicyByID(policyID); ok { - return policy, nil - } - return nil, errors.Errorf("Cannot find placement policy with ID: %d", policyID) -} - -type bundleInfoBuilder struct { - deltaUpdate bool - // tables or partitions that need to update placement bundle - updateTables map[int64]any - // all tables or partitions referring these policies should update placement bundle - updatePolicies map[int64]any - // partitions that need to update placement bundle - updatePartitions map[int64]any -} - -func (b *bundleInfoBuilder) initBundleInfoBuilder() { - b.updateTables = make(map[int64]any) - b.updatePartitions = make(map[int64]any) - b.updatePolicies = make(map[int64]any) -} - -func (b *bundleInfoBuilder) SetDeltaUpdateBundles() { - b.deltaUpdate = true -} - -func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) { - delete(is.ruleBundleMap, tblID) -} - -func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) { - b.updateTables[tblID] = struct{}{} -} - -func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) { - b.updatePartitions[partID] = struct{}{} -} - -func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) { - b.updatePolicies[policyID] = struct{}{} -} - -func (b *bundleInfoBuilder) updateInfoSchemaBundles(is *infoSchema) { - if b.deltaUpdate { - b.completeUpdateTables(is) - for tblID := range b.updateTables { - b.updateTableBundles(is, tblID) - } - return - } - - // do full update bundles - is.ruleBundleMap = make(map[int64]*placement.Bundle) - for _, tbls := range is.schemaMap { - for _, tbl := range tbls.tables { - b.updateTableBundles(is, tbl.Meta().ID) - } - } -} - -func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) { - if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 { - return - } - - for _, tbls := range is.schemaMap { - for _, tbl := range tbls.tables { - tblInfo := tbl.Meta() - if tblInfo.PlacementPolicyRef != nil { - if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok { - b.markTableBundleShouldUpdate(tblInfo.ID) - } - } - - if tblInfo.Partition != nil { - for _, par := range tblInfo.Partition.Definitions { - if _, ok := b.updatePartitions[par.ID]; ok { - b.markTableBundleShouldUpdate(tblInfo.ID) - } - } - } - } - } -} - -func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) { - tbl, ok := is.TableByID(tableID) - if !ok { - b.deleteBundle(is, tableID) - return - } - - getter := &policyGetter{is: is} - bundle, err := placement.NewTableBundle(getter, tbl.Meta()) - if err != nil { - logutil.BgLogger().Error("create table bundle failed", zap.Error(err)) - } else if bundle != nil { - is.ruleBundleMap[tableID] = bundle - } else { - b.deleteBundle(is, tableID) - } - - if tbl.Meta().Partition == nil { - return - } - - for _, par := range tbl.Meta().Partition.Definitions { - bundle, err = placement.NewPartitionBundle(getter, par) - if err != nil { - logutil.BgLogger().Error("create partition bundle failed", - zap.Error(err), - zap.Int64("partition id", par.ID), - ) - } else if bundle != nil { - is.ruleBundleMap[par.ID] = bundle - } else { - b.deleteBundle(is, par.ID) - } - } -} - // Builder builds a new InfoSchema. type Builder struct { enableV2 bool @@ -233,25 +105,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro } func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - tblIDs := make([]int64, 0, len(diff.AffectedOpts)) - if diff.AffectedOpts != nil { - for _, opt := range diff.AffectedOpts { - affectedDiff := &model.SchemaDiff{ - Version: diff.Version, - Type: model.ActionCreateTable, - SchemaID: opt.SchemaID, - TableID: opt.TableID, - OldSchemaID: opt.OldSchemaID, - OldTableID: opt.OldTableID, - } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) - if err != nil { - return nil, errors.Trace(err) - } - tblIDs = append(tblIDs, affectedIDs...) - } - } - return tblIDs, nil + return b.applyAffectedOpts(m, make([]int64, 0, len(diff.AffectedOpts)), diff, model.ActionCreateTable) } func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { @@ -260,6 +114,7 @@ func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.Schema return nil, errors.Trace(err) } + // bundle ops if diff.Type == model.ActionTruncateTable { b.deleteBundle(b.infoSchema, diff.OldTableID) b.markTableBundleShouldUpdate(diff.TableID) @@ -284,6 +139,7 @@ func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff return nil, errors.Trace(err) } + // bundle ops b.markTableBundleShouldUpdate(diff.TableID) for _, opt := range diff.AffectedOpts { b.deleteBundle(b.infoSchema, opt.OldTableID) @@ -296,6 +152,8 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) if err != nil { return nil, errors.Trace(err) } + + // bundle ops for _, opt := range diff.AffectedOpts { if opt.OldTableID != 0 { b.deleteBundle(b.infoSchema, opt.OldTableID) @@ -392,6 +250,7 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int return nil, errors.Trace(err) } + // bundle ops for _, opt := range diff.AffectedOpts { b.markTableBundleShouldUpdate(opt.TableID) } @@ -432,30 +291,34 @@ func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSche return err } +func (b *Builder) applyAffectedOpts(m *meta.Meta, tblIDs []int64, diff *model.SchemaDiff, tp model.ActionType) ([]int64, error) { + if diff.AffectedOpts != nil { + for _, opt := range diff.AffectedOpts { + affectedDiff := &model.SchemaDiff{ + Version: diff.Version, + Type: tp, + SchemaID: opt.SchemaID, + TableID: opt.TableID, + OldSchemaID: opt.OldSchemaID, + OldTableID: opt.OldTableID, + } + affectedIDs, err := b.ApplyDiff(m, affectedDiff) + if err != nil { + return nil, errors.Trace(err) + } + tblIDs = append(tblIDs, affectedIDs...) + } + } + return tblIDs, nil +} + func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { return nil, errors.Trace(err) } - for _, opt := range diff.AffectedOpts { - var err error - affectedDiff := &model.SchemaDiff{ - Version: diff.Version, - Type: diff.Type, - SchemaID: opt.SchemaID, - TableID: opt.TableID, - OldSchemaID: opt.OldSchemaID, - OldTableID: opt.OldTableID, - } - affectedIDs, err := b.ApplyDiff(m, affectedDiff) - if err != nil { - return nil, errors.Trace(err) - } - tblIDs = append(tblIDs, affectedIDs...) - } - - return tblIDs, nil + return b.applyAffectedOpts(m, tblIDs, diff, diff.Type) } func (b *Builder) getTableIDs(diff *model.SchemaDiff) (oldTableID, newTableID int64) { @@ -609,78 +472,6 @@ func appendAffectedIDs(affected []int64, tblInfo *model.TableInfo) []int64 { return affected } -// copySortedTables copies sortedTables for old table and new table for later modification. -func (b *Builder) copySortedTables(oldTableID, newTableID int64) { - if tableIDIsValid(oldTableID) { - b.copySortedTablesBucket(tableBucketIdx(oldTableID)) - } - if tableIDIsValid(newTableID) && newTableID != oldTableID { - b.copySortedTablesBucket(tableBucketIdx(newTableID)) - } -} - -func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.SchemaDiff) error { - group, err := m.GetResourceGroup(diff.SchemaID) - if err != nil { - return errors.Trace(err) - } - if group == nil { - return ErrResourceGroupNotExists.GenWithStackByArgs(fmt.Sprintf("(Group ID %d)", diff.SchemaID)) - } - // TODO: need mark updated? - b.infoSchema.setResourceGroup(group) - return nil -} - -func (b *Builder) applyDropResourceGroup(m *meta.Meta, diff *model.SchemaDiff) []int64 { - group, ok := b.infoSchema.ResourceGroupByID(diff.SchemaID) - if !ok { - return nil - } - b.infoSchema.deleteResourceGroup(group.Name.L) - // TODO: return the related information. - return []int64{} -} - -func (b *Builder) applyCreatePolicy(m *meta.Meta, diff *model.SchemaDiff) error { - po, err := m.GetPolicy(diff.SchemaID) - if err != nil { - return errors.Trace(err) - } - if po == nil { - return ErrPlacementPolicyNotExists.GenWithStackByArgs( - fmt.Sprintf("(Policy ID %d)", diff.SchemaID), - ) - } - - if _, ok := b.infoSchema.PolicyByID(po.ID); ok { - // if old policy with the same id exists, it means replace, - // so the tables referring this policy's bundle should be updated - b.markBundlesReferPolicyShouldUpdate(po.ID) - } - - b.infoSchema.setPolicy(po) - return nil -} - -func (b *Builder) applyAlterPolicy(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - po, err := m.GetPolicy(diff.SchemaID) - if err != nil { - return nil, errors.Trace(err) - } - - if po == nil { - return nil, ErrPlacementPolicyNotExists.GenWithStackByArgs( - fmt.Sprintf("(Policy ID %d)", diff.SchemaID), - ) - } - - b.infoSchema.setPolicy(po) - b.markBundlesReferPolicyShouldUpdate(po.ID) - // TODO: return the policy related table ids - return []int64{}, nil -} - func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error { di, err := m.GetDatabase(diff.SchemaID) if err != nil { @@ -730,16 +521,6 @@ func (b *Builder) applyModifySchemaDefaultPlacement(m *meta.Meta, diff *model.Sc return nil } -func (b *Builder) applyDropPolicy(PolicyID int64) []int64 { - po, ok := b.infoSchema.PolicyByID(PolicyID) - if !ok { - return nil - } - b.infoSchema.deletePolicy(po.Name.L) - // TODO: return the policy related table ids - return []int64{} -} - func (b *Builder) applyDropSchema(schemaID int64) []int64 { di, ok := b.infoSchema.SchemaByID(schemaID) if !ok { @@ -784,6 +565,16 @@ func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]in return b.applyCreateTables(m, diff) } +// copySortedTables copies sortedTables for old table and new table for later modification. +func (b *Builder) copySortedTables(oldTableID, newTableID int64) { + if tableIDIsValid(oldTableID) { + b.copySortedTablesBucket(tableBucketIdx(oldTableID)) + } + if tableIDIsValid(newTableID) && newTableID != oldTableID { + b.copySortedTablesBucket(tableBucketIdx(newTableID)) + } +} + func (b *Builder) copySortedTablesBucket(bucketIdx int) { oldSortedTables := b.infoSchema.sortedTablesBuckets[bucketIdx] newSortedTables := make(sortedTables, len(oldSortedTables)) @@ -970,12 +761,6 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ return affected } -// TODO: get rid of this and use infoschemaV2 directly. -type infoschemaProxy struct { - infoschemaV2 - v1 InfoSchema -} - // Build builds and returns the built infoschema. func (b *Builder) Build() InfoSchema { b.updateInfoSchemaBundles(b.infoSchema) @@ -1020,46 +805,6 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) { } } -func (b *Builder) copyBundlesMap(oldIS *infoSchema) { - b.infoSchema.ruleBundleMap = make(map[int64]*placement.Bundle) - for id, v := range oldIS.ruleBundleMap { - b.infoSchema.ruleBundleMap[id] = v - } -} - -func (b *Builder) copyPoliciesMap(oldIS *infoSchema) { - is := b.infoSchema - for _, v := range oldIS.AllPlacementPolicies() { - is.policyMap[v.Name.L] = v - } -} - -func (b *Builder) copyResourceGroupMap(oldIS *infoSchema) { - is := b.infoSchema - for _, v := range oldIS.AllResourceGroups() { - is.resourceGroupMap[v.Name.L] = v - } -} - -func (b *Builder) copyTemporaryTableIDsMap(oldIS *infoSchema) { - is := b.infoSchema - if len(oldIS.temporaryTableIDs) == 0 { - is.temporaryTableIDs = nil - return - } - - is.temporaryTableIDs = make(map[int64]struct{}) - for tblID := range oldIS.temporaryTableIDs { - is.temporaryTableIDs[tblID] = struct{}{} - } -} - -func (b *Builder) copyReferredForeignKeyMap(oldIS *infoSchema) { - for k, v := range oldIS.referredForeignKeyMap { - b.infoSchema.referredForeignKeyMap[k] = v - } -} - // getSchemaAndCopyIfNecessary creates a new schemaTables instance when a table in the database has changed. // It also does modifications on the new one because old schemaTables must be read-only. // And it will only copy the changed database once in the lifespan of the Builder. @@ -1081,26 +826,6 @@ func (b *Builder) getSchemaAndCopyIfNecessary(dbName string) *model.DBInfo { return b.infoSchema.schemaMap[dbName].dbInfo } -func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) { - info := b.infoSchema - // build the policies. - for _, policy := range policies { - info.setPolicy(policy) - } - - // build the groups. - for _, group := range resourceGroups { - info.setResourceGroup(group) - } - - // Maintain foreign key reference information. - for _, di := range dbInfos { - for _, t := range di.Tables { - b.infoSchema.addReferredForeignKeys(di.Name, t) - } - } -} - func (b *Builder) initVirtualTables(schemaVersion int64) error { // Initialize virtual tables. for _, driver := range drivers { @@ -1221,13 +946,6 @@ func (b *Builder) addTable(schemaVersion int64, di *model.DBInfo, tblInfo *model } } -func (b *Builder) addTemporaryTable(tblID int64) { - if b.infoSchema.temporaryTableIDs == nil { - b.infoSchema.temporaryTableIDs = make(map[int64]struct{}) - } - b.infoSchema.temporaryTableIDs[tblID] = struct{}{} -} - type virtualTableDriver struct { *model.DBInfo TableFromMeta tableFromMetaFunc diff --git a/pkg/infoschema/builder_misc.go b/pkg/infoschema/builder_misc.go new file mode 100644 index 0000000000000..cb78ff39bb102 --- /dev/null +++ b/pkg/infoschema/builder_misc.go @@ -0,0 +1,163 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/ddl/placement" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/parser/model" +) + +func (b *Builder) applyCreatePolicy(m *meta.Meta, diff *model.SchemaDiff) error { + po, err := m.GetPolicy(diff.SchemaID) + if err != nil { + return errors.Trace(err) + } + if po == nil { + return ErrPlacementPolicyNotExists.GenWithStackByArgs( + fmt.Sprintf("(Policy ID %d)", diff.SchemaID), + ) + } + + if _, ok := b.infoSchema.PolicyByID(po.ID); ok { + // if old policy with the same id exists, it means replace, + // so the tables referring this policy's bundle should be updated + b.markBundlesReferPolicyShouldUpdate(po.ID) + } + + b.infoSchema.setPolicy(po) + return nil +} + +func (b *Builder) applyAlterPolicy(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + po, err := m.GetPolicy(diff.SchemaID) + if err != nil { + return nil, errors.Trace(err) + } + + if po == nil { + return nil, ErrPlacementPolicyNotExists.GenWithStackByArgs( + fmt.Sprintf("(Policy ID %d)", diff.SchemaID), + ) + } + + b.infoSchema.setPolicy(po) + b.markBundlesReferPolicyShouldUpdate(po.ID) + // TODO: return the policy related table ids + return []int64{}, nil +} + +func (b *Builder) applyDropPolicy(PolicyID int64) []int64 { + po, ok := b.infoSchema.PolicyByID(PolicyID) + if !ok { + return nil + } + b.infoSchema.deletePolicy(po.Name.L) + // TODO: return the policy related table ids + return []int64{} +} + +func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.SchemaDiff) error { + group, err := m.GetResourceGroup(diff.SchemaID) + if err != nil { + return errors.Trace(err) + } + if group == nil { + return ErrResourceGroupNotExists.GenWithStackByArgs(fmt.Sprintf("(Group ID %d)", diff.SchemaID)) + } + // TODO: need mark updated? + b.infoSchema.setResourceGroup(group) + return nil +} + +func (b *Builder) applyDropResourceGroup(m *meta.Meta, diff *model.SchemaDiff) []int64 { + group, ok := b.infoSchema.ResourceGroupByID(diff.SchemaID) + if !ok { + return nil + } + b.infoSchema.deleteResourceGroup(group.Name.L) + // TODO: return the related information. + return []int64{} +} + +func (b *Builder) addTemporaryTable(tblID int64) { + if b.infoSchema.temporaryTableIDs == nil { + b.infoSchema.temporaryTableIDs = make(map[int64]struct{}) + } + b.infoSchema.temporaryTableIDs[tblID] = struct{}{} +} + +func (b *Builder) copyBundlesMap(oldIS *infoSchema) { + b.infoSchema.ruleBundleMap = make(map[int64]*placement.Bundle) + for id, v := range oldIS.ruleBundleMap { + b.infoSchema.ruleBundleMap[id] = v + } +} + +func (b *Builder) copyPoliciesMap(oldIS *infoSchema) { + is := b.infoSchema + for _, v := range oldIS.AllPlacementPolicies() { + is.policyMap[v.Name.L] = v + } +} + +func (b *Builder) copyResourceGroupMap(oldIS *infoSchema) { + is := b.infoSchema + for _, v := range oldIS.AllResourceGroups() { + is.resourceGroupMap[v.Name.L] = v + } +} + +func (b *Builder) copyTemporaryTableIDsMap(oldIS *infoSchema) { + is := b.infoSchema + if len(oldIS.temporaryTableIDs) == 0 { + is.temporaryTableIDs = nil + return + } + + is.temporaryTableIDs = make(map[int64]struct{}) + for tblID := range oldIS.temporaryTableIDs { + is.temporaryTableIDs[tblID] = struct{}{} + } +} + +func (b *Builder) copyReferredForeignKeyMap(oldIS *infoSchema) { + for k, v := range oldIS.referredForeignKeyMap { + b.infoSchema.referredForeignKeyMap[k] = v + } +} + +func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) { + info := b.infoSchema + // build the policies. + for _, policy := range policies { + info.setPolicy(policy) + } + + // build the groups. + for _, group := range resourceGroups { + info.setResourceGroup(group) + } + + // Maintain foreign key reference information. + for _, di := range dbInfos { + for _, t := range di.Tables { + b.infoSchema.addReferredForeignKeys(di.Name, t) + } + } +} diff --git a/pkg/infoschema/bundle_builder.go b/pkg/infoschema/bundle_builder.go new file mode 100644 index 0000000000000..4aa385bbe6aa9 --- /dev/null +++ b/pkg/infoschema/bundle_builder.go @@ -0,0 +1,149 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/ddl/placement" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +type policyGetter struct { + is *infoSchema +} + +func (p *policyGetter) GetPolicy(policyID int64) (*model.PolicyInfo, error) { + if policy, ok := p.is.PolicyByID(policyID); ok { + return policy, nil + } + return nil, errors.Errorf("Cannot find placement policy with ID: %d", policyID) +} + +type bundleInfoBuilder struct { + deltaUpdate bool + // tables or partitions that need to update placement bundle + updateTables map[int64]any + // all tables or partitions referring these policies should update placement bundle + updatePolicies map[int64]any + // partitions that need to update placement bundle + updatePartitions map[int64]any +} + +func (b *bundleInfoBuilder) initBundleInfoBuilder() { + b.updateTables = make(map[int64]any) + b.updatePartitions = make(map[int64]any) + b.updatePolicies = make(map[int64]any) +} + +func (b *bundleInfoBuilder) SetDeltaUpdateBundles() { + b.deltaUpdate = true +} + +func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) { + delete(is.ruleBundleMap, tblID) +} + +func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) { + b.updateTables[tblID] = struct{}{} +} + +func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) { + b.updatePartitions[partID] = struct{}{} +} + +func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) { + b.updatePolicies[policyID] = struct{}{} +} + +func (b *bundleInfoBuilder) updateInfoSchemaBundles(is *infoSchema) { + if b.deltaUpdate { + b.completeUpdateTables(is) + for tblID := range b.updateTables { + b.updateTableBundles(is, tblID) + } + return + } + + // do full update bundles + is.ruleBundleMap = make(map[int64]*placement.Bundle) + for _, tbls := range is.schemaMap { + for _, tbl := range tbls.tables { + b.updateTableBundles(is, tbl.Meta().ID) + } + } +} + +func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) { + if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 { + return + } + + for _, tbls := range is.schemaMap { + for _, tbl := range tbls.tables { + tblInfo := tbl.Meta() + if tblInfo.PlacementPolicyRef != nil { + if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok { + b.markTableBundleShouldUpdate(tblInfo.ID) + } + } + + if tblInfo.Partition != nil { + for _, par := range tblInfo.Partition.Definitions { + if _, ok := b.updatePartitions[par.ID]; ok { + b.markTableBundleShouldUpdate(tblInfo.ID) + } + } + } + } + } +} + +func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) { + tbl, ok := is.TableByID(tableID) + if !ok { + b.deleteBundle(is, tableID) + return + } + + getter := &policyGetter{is: is} + bundle, err := placement.NewTableBundle(getter, tbl.Meta()) + if err != nil { + logutil.BgLogger().Error("create table bundle failed", zap.Error(err)) + } else if bundle != nil { + is.ruleBundleMap[tableID] = bundle + } else { + b.deleteBundle(is, tableID) + } + + if tbl.Meta().Partition == nil { + return + } + + for _, par := range tbl.Meta().Partition.Definitions { + bundle, err = placement.NewPartitionBundle(getter, par) + if err != nil { + logutil.BgLogger().Error("create partition bundle failed", + zap.Error(err), + zap.Int64("partition id", par.ID), + ) + } else if bundle != nil { + is.ruleBundleMap[par.ID] = bundle + } else { + b.deleteBundle(is, par.ID) + } + } +}