diff --git a/pkg/infoschema/BUILD.bazel b/pkg/infoschema/BUILD.bazel index 7d8e8bff33b7e..c24cb1061be0e 100644 --- a/pkg/infoschema/BUILD.bazel +++ b/pkg/infoschema/BUILD.bazel @@ -83,7 +83,7 @@ go_test( ], embed = [":infoschema"], flaky = True, - shard_count = 12, + shard_count = 13, deps = [ "//pkg/ddl/placement", "//pkg/domain", diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index d3c2b5469e9a1..b03033d2e08ad 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -132,7 +132,7 @@ func applyTruncateTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaD return tblIDs, nil } -func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := applyTableUpdate(b, m, diff) if err != nil { return nil, errors.Trace(err) @@ -146,7 +146,7 @@ func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff return tblIDs, nil } -func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func applyReorganizePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := applyTableUpdate(b, m, diff) if err != nil { return nil, errors.Trace(err) @@ -165,7 +165,7 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } -func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func applyExchangeTablePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { // It is not in StatePublic. if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { ntIDs, err := applyTableUpdate(b, m, diff) @@ -793,12 +793,13 @@ func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64) // Build builds and returns the built infoschema. func (b *Builder) Build() InfoSchema { - b.updateInfoSchemaBundles(b.infoSchema) if b.enableV2 { b.infoschemaV2.ts = math.MaxUint64 // TODO: should be the correct TS b.infoschemaV2.schemaVersion = b.infoSchema.SchemaMetaVersion() + updateInfoSchemaBundles(b) return &b.infoschemaV2 } + updateInfoSchemaBundles(b) return b.infoSchema } diff --git a/pkg/infoschema/bundle_builder.go b/pkg/infoschema/bundle_builder.go index 4aa385bbe6aa9..133357709f235 100644 --- a/pkg/infoschema/bundle_builder.go +++ b/pkg/infoschema/bundle_builder.go @@ -112,8 +112,9 @@ func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) { } } -func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) { - tbl, ok := is.TableByID(tableID) +func (b *bundleInfoBuilder) updateTableBundles(infoSchemaInterface InfoSchema, tableID int64) { + is := infoSchemaInterface.base() + tbl, ok := infoSchemaInterface.TableByID(tableID) if !ok { b.deleteBundle(is, tableID) return diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index 1604354fd3bc2..705076c296891 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -151,6 +151,10 @@ func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) Inf var _ InfoSchema = (*infoSchema)(nil) +func (is *infoSchema) base() *infoSchema { + return is +} + func (is *infoSchema) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) { tableNames, ok := is.schemaMap[schema.L] if !ok { diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index a50205e284b3c..2ec839abdc8e8 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -280,6 +280,10 @@ func search(bt *btree.BTreeG[tableItem], schemaVersion int64, end tableItem, mat return target, ok } +func (is *infoschemaV2) base() *infoSchema { + return is.infoSchema +} + func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { // Get from the cache. key := tableCacheKey{id, is.schemaVersion} @@ -609,10 +613,6 @@ func applyModifySchemaDefaultPlacement(b *Builder, m *meta.Meta, diff *model.Sch return b.applyModifySchemaDefaultPlacement(m, diff) } -func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - return b.applyDropTableOrPartition(m, diff) -} - func applyRecoverTable(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { return b.applyRecoverTable(m, diff) } @@ -621,18 +621,12 @@ func applyCreateTables(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int6 return b.applyCreateTables(m, diff) } -func applyReorganizePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { +func updateInfoSchemaBundles(b *Builder) { if b.enableV2 { - return b.applyReorganizePartitionV2(m, diff) + b.updateInfoSchemaBundlesV2(&b.infoschemaV2) + } else { + b.updateInfoSchemaBundles(b.infoSchema) } - return b.applyReorganizePartition(m, diff) -} - -func applyExchangeTablePartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - if b.enableV2 { - return b.applyExchangeTablePartitionV2(m, diff) - } - return b.applyExchangeTablePartition(m, diff) } // TODO: more UT to check the correctness. @@ -724,14 +718,6 @@ func (b *Builder) applyModifySchemaDefaultPlacementV2(m *meta.Meta, diff *model. return nil } -func (b *Builder) applyTruncateTableOrPartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - panic("TODO") -} - -func (b *Builder) applyDropTableOrPartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - panic("TODO") -} - func (b *Builder) applyRecoverTableV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { panic("TODO") } @@ -740,10 +726,47 @@ func (b *Builder) applyCreateTablesV2(m *meta.Meta, diff *model.SchemaDiff) ([]i panic("TODO") } -func (b *Builder) applyReorganizePartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - panic("TODO") +func (b *bundleInfoBuilder) updateInfoSchemaBundlesV2(is *infoschemaV2) { + if b.deltaUpdate { + b.completeUpdateTablesV2(is) + for tblID := range b.updateTables { + b.updateTableBundles(is, tblID) + } + return + } + + // do full update bundles + // TODO: This is quite inefficient! we need some better way or avoid this API. + is.ruleBundleMap = make(map[int64]*placement.Bundle) + for _, dbInfo := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(dbInfo.Name) { + b.updateTableBundles(is, tbl.Meta().ID) + } + } } -func (b *Builder) applyExchangeTablePartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - panic("TODO") +func (b *bundleInfoBuilder) completeUpdateTablesV2(is *infoschemaV2) { + if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 { + return + } + + // TODO: This is quite inefficient! we need some better way or avoid this API. + for _, dbInfo := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(dbInfo.Name) { + tblInfo := tbl.Meta() + if tblInfo.PlacementPolicyRef != nil { + if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok { + b.markTableBundleShouldUpdate(tblInfo.ID) + } + } + + if tblInfo.Partition != nil { + for _, par := range tblInfo.Partition.Definitions { + if _, ok := b.updatePartitions[par.ID]; ok { + b.markTableBundleShouldUpdate(tblInfo.ID) + } + } + } + } + } } diff --git a/pkg/infoschema/infoschema_v2_test.go b/pkg/infoschema/infoschema_v2_test.go index 029a0ecf8fd70..ae7a5f81490a9 100644 --- a/pkg/infoschema/infoschema_v2_test.go +++ b/pkg/infoschema/infoschema_v2_test.go @@ -169,7 +169,7 @@ func TestMisc(t *testing.T) { internal.UpdatePolicy(t, r.Store(), policyInfo) txn, err = r.Store().Begin() require.NoError(t, err) - err = applyCreatePolicy(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: policyInfo.ID}) + _, err = applyAlterPolicy(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: policyInfo.ID}) require.NoError(t, err) is = builder.Build() require.Len(t, is.AllPlacementPolicies(), 2) @@ -190,3 +190,84 @@ func TestMisc(t *testing.T) { require.Equal(t, policyInfo2, getPolicyInfo) require.NoError(t, txn.Rollback()) } + +func TestBundles(t *testing.T) { + r := internal.CreateAutoIDRequirement(t) + defer func() { + r.Store().Close() + }() + + schemaName := model.NewCIStr("testDB") + tableName := model.NewCIStr("test") + builder, err := NewBuilder(r, nil, NewData()).InitWithDBInfos(nil, nil, nil, 1) + require.NoError(t, err) + is := builder.Build() + require.Equal(t, 2, len(is.AllSchemas())) + + // create database + dbInfo := internal.MockDBInfo(t, r.Store(), schemaName.O) + internal.AddDB(t, r.Store(), dbInfo) + txn, err := r.Store().Begin() + require.NoError(t, err) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreateSchema, Version: 1, SchemaID: dbInfo.ID}) + require.NoError(t, err) + is = builder.Build() + require.Equal(t, 3, len(is.AllSchemas())) + require.NoError(t, txn.Rollback()) + + // create table + tblInfo := internal.MockTableInfo(t, r.Store(), tableName.O) + tblInfo.Partition = &model.PartitionInfo{Definitions: []model.PartitionDefinition{{ID: 1}, {ID: 2}}} + internal.AddTable(t, r.Store(), dbInfo, tblInfo) + txn, err = r.Store().Begin() + require.NoError(t, err) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreateTable, Version: 2, SchemaID: dbInfo.ID, TableID: tblInfo.ID}) + require.NoError(t, err) + is = builder.Build() + require.Equal(t, 1, len(is.SchemaTables(dbInfo.Name))) + require.NoError(t, txn.Rollback()) + + // test create policy + policyInfo := internal.MockPolicyInfo(t, r.Store(), "test") + internal.CreatePolicy(t, r.Store(), policyInfo) + txn, err = r.Store().Begin() + require.NoError(t, err) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionCreatePlacementPolicy, Version: 3, SchemaID: policyInfo.ID}) + require.NoError(t, err) + is = builder.Build() + require.Len(t, is.AllPlacementPolicies(), 1) + getPolicyInfo, ok := is.PolicyByName(policyInfo.Name) + require.True(t, ok) + require.Equal(t, policyInfo, getPolicyInfo) + require.NoError(t, txn.Rollback()) + + // markTableBundleShouldUpdate + // test alter table placement + policyRefInfo := internal.MockPolicyRefInfo(t, r.Store(), "test") + tblInfo.PlacementPolicyRef = policyRefInfo + internal.UpdateTable(t, r.Store(), dbInfo, tblInfo) + txn, err = r.Store().Begin() + require.NoError(t, err) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionAlterTablePlacement, Version: 4, SchemaID: dbInfo.ID, TableID: tblInfo.ID}) + require.NoError(t, err) + is = builder.Build() + getTableInfo, err := is.TableByName(schemaName, tableName) + require.NoError(t, err) + require.Equal(t, policyRefInfo, getTableInfo.Meta().PlacementPolicyRef) + require.NoError(t, txn.Rollback()) + + // markBundlesReferPolicyShouldUpdate + // test alter policy + policyInfo.State = model.StatePublic + internal.UpdatePolicy(t, r.Store(), policyInfo) + txn, err = r.Store().Begin() + require.NoError(t, err) + _, err = builder.ApplyDiff(meta.NewMeta(txn), &model.SchemaDiff{Type: model.ActionAlterPlacementPolicy, Version: 5, SchemaID: policyInfo.ID}) + require.NoError(t, err) + is = builder.Build() + getTableInfo, err = is.TableByName(schemaName, tableName) + require.NoError(t, err) + getPolicyInfo, ok = is.PolicyByName(getTableInfo.Meta().PlacementPolicyRef.Name) + require.True(t, ok) + require.Equal(t, policyInfo, getPolicyInfo) +} diff --git a/pkg/infoschema/interface.go b/pkg/infoschema/interface.go index 06858a8574f8c..202b225d6230d 100644 --- a/pkg/infoschema/interface.go +++ b/pkg/infoschema/interface.go @@ -36,6 +36,7 @@ type InfoSchema interface { SchemaMetaVersion() int64 FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition) Misc + base() *infoSchema } // Misc contains the methods that are not closely related to InfoSchema. diff --git a/pkg/infoschema/internal/testkit.go b/pkg/infoschema/internal/testkit.go index 2a03c8faaadd3..e5bb87f368abf 100644 --- a/pkg/infoschema/internal/testkit.go +++ b/pkg/infoschema/internal/testkit.go @@ -200,6 +200,16 @@ func MockPolicyInfo(t *testing.T, store kv.Storage, policyName string) *model.Po } } +// MockPolicyRefInfo mock policy ref info for testing. +func MockPolicyRefInfo(t *testing.T, store kv.Storage, policyName string) *model.PolicyRefInfo { + id, err := GenGlobalID(store) + require.NoError(t, err) + return &model.PolicyRefInfo{ + ID: id, + Name: model.NewCIStr(policyName), + } +} + // AddTable add mock table for testing. func AddTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) @@ -211,6 +221,17 @@ func AddTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *mod require.NoError(t, err) } +// UpdateTable update mock table for testing. +func UpdateTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error { + err := meta.NewMeta(txn).UpdateTable(dbInfo.ID, tblInfo) + require.NoError(t, err) + return errors.Trace(err) + }) + require.NoError(t, err) +} + // DropTable drop mock table for testing. func DropTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblID int64, tblName string) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)