From 2fa77070730236c65e98b3356cef83c2ca7f2754 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 29 May 2023 12:40:41 +0800 Subject: [PATCH] ddl: fix bug that new partition will not set the placement policy after truncated (#44043) (#44062) close pingcap/tidb#44031, close pingcap/tidb#44116 --- ddl/partition.go | 66 +++ ddl/placement_policy_test.go | 438 ++++++++++++------ .../integration_test/integration_test.go | 2 +- infoschema/builder.go | 8 +- 4 files changed, 371 insertions(+), 143 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 4e389a35b5bbc..612a7e4d4165f 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -283,6 +283,25 @@ func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDe return bundles, nil } +// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules. +// When a partition is not configured with a placement policy directly, its rule is in the table's placement group which will be deleted after +// partition truncated/dropped. So it is necessary to create a standalone placement group with partition id after it. +func droppedPartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, dropPartitions []model.PartitionDefinition) ([]*placement.Bundle, error) { + partitions := make([]model.PartitionDefinition, 0, len(dropPartitions)) + for _, def := range dropPartitions { + def = def.Clone() + if def.PlacementPolicyRef == nil { + def.PlacementPolicyRef = tblInfo.PlacementPolicyRef + } + + if def.PlacementPolicyRef != nil { + partitions = append(partitions, def) + } + } + + return placement.NewPartitionListBundles(t, partitions) +} + // updatePartitionInfo merge `addingDefinitions` into `Definitions` in the tableInfo. func updatePartitionInfo(tblInfo *model.TableInfo) { parInfo := &model.PartitionInfo{} @@ -1820,6 +1839,32 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, err } + var bundles []*placement.Bundle + // create placement groups for each dropped partition to keep the data's placement before GC + // These placements groups will be deleted after GC + bundles, err = droppedPartitionBundles(t, tblInfo, tblInfo.Partition.DroppingDefinitions) + if err != nil { + job.State = model.JobStateCancelled + return ver, err + } + + var tableBundle *placement.Bundle + // Recompute table bundle to remove dropped partitions rules from its group + tableBundle, err = placement.NewTableBundle(t, tblInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if tableBundle != nil { + bundles = append(bundles, tableBundle) + } + + if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { + job.State = model.JobStateCancelled + return ver, err + } + job.SchemaState = model.StateDeleteOnly ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) case model.StateDeleteOnly: @@ -1915,11 +1960,13 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned) } + oldPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) for _, oldID := range oldIDs { for i := 0; i < len(pi.Definitions); i++ { def := &pi.Definitions[i] if def.ID == oldID { + oldPartitions = append(oldPartitions, def.Clone()) pid, err1 := t.GenGlobalID() if err1 != nil { return ver, errors.Trace(err1) @@ -1967,6 +2014,25 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, errors.Trace(err) } + tableBundle, err := placement.NewTableBundle(t, tblInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if tableBundle != nil { + bundles = append(bundles, tableBundle) + } + + // create placement groups for each dropped partition to keep the data's placement before GC + // These placements groups will be deleted after GC + keepDroppedBundles, err := droppedPartitionBundles(t, tblInfo, oldPartitions) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + bundles = append(bundles, keepDroppedBundles...) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index 0b2c3f29b21ca..81bbc9c9ceda4 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -41,6 +41,43 @@ import ( "github.com/stretchr/testify/require" ) +type bundleCheck struct { + ID string + tableID int64 + bundle *placement.Bundle + comment string + waitingGC bool +} + +func (c *bundleCheck) check(t *testing.T, is infoschema.InfoSchema) { + pdGot, err := infosync.GetRuleBundle(context.TODO(), c.ID) + require.NoError(t, err) + if c.bundle == nil { + require.True(t, pdGot.IsEmpty(), "bundle should be nil for table: %d, comment: %s", c.tableID, c.comment) + } else { + expectedJSON, err := json.Marshal(c.bundle) + require.NoError(t, err, c.comment) + + pdGotJSON, err := json.Marshal(pdGot) + require.NoError(t, err, c.comment) + require.NotNil(t, pdGot, c.comment) + require.Equal(t, string(expectedJSON), string(pdGotJSON), c.comment) + } + + isGot, ok := is.PlacementBundleByPhysicalTableID(c.tableID) + if c.bundle == nil || c.waitingGC { + require.False(t, ok, "bundle should be nil for table: %d, comment: %s", c.tableID, c.comment) + } else { + expectedJSON, err := json.Marshal(c.bundle) + require.NoError(t, err, c.comment) + + isGotJSON, err := json.Marshal(isGot) + require.NoError(t, err, c.comment) + require.NotNil(t, isGot, c.comment) + require.Equal(t, string(expectedJSON), string(isGotJSON), c.comment) + } +} + func checkExistTableBundlesInPD(t *testing.T, do *domain.Domain, dbName string, tbName string) { tblInfo, err := do.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName)) require.NoError(t, err) @@ -48,7 +85,25 @@ func checkExistTableBundlesInPD(t *testing.T, do *domain.Domain, dbName string, ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) require.NoError(t, kv.RunInNewTxn(ctx, do.Store(), false, func(ctx context.Context, txn kv.Transaction) error { tt := meta.NewMeta(txn) - checkTableBundlesInPD(t, do, tt, tblInfo.Meta()) + checkTableBundlesInPD(t, do, tt, tblInfo.Meta(), false) + return nil + })) +} + +func checkWaitingGCTableBundlesInPD(t *testing.T, do *domain.Domain, tblInfo *model.TableInfo) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + require.NoError(t, kv.RunInNewTxn(ctx, do.Store(), false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + checkTableBundlesInPD(t, do, tt, tblInfo, true) + return nil + })) +} + +func checkWaitingGCPartitionBundlesInPD(t *testing.T, do *domain.Domain, partitions []model.PartitionDefinition) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + require.NoError(t, kv.RunInNewTxn(ctx, do.Store(), false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + checkPartitionBundlesInPD(t, do.InfoSchema(), tt, partitions, true) return nil })) } @@ -77,55 +132,39 @@ func checkAllBundlesNotChange(t *testing.T, bundles []*placement.Bundle) { } } -func checkTableBundlesInPD(t *testing.T, do *domain.Domain, tt *meta.Meta, tblInfo *model.TableInfo) { - checks := make([]*struct { - ID string - tableID int64 - bundle *placement.Bundle - }, 0) +func checkPartitionBundlesInPD(t *testing.T, is infoschema.InfoSchema, tt *meta.Meta, partitions []model.PartitionDefinition, waitingGC bool) { + checks := make([]*bundleCheck, 0) + for _, def := range partitions { + bundle, err := placement.NewPartitionBundle(tt, def) + require.NoError(t, err) + checks = append(checks, &bundleCheck{ + ID: placement.GroupID(def.ID), + tableID: def.ID, + bundle: bundle, + comment: fmt.Sprintf("partitionName: %s, physicalID: %d", def.Name, def.ID), + waitingGC: waitingGC, + }) + } + for _, ck := range checks { + ck.check(t, is) + } +} +func checkTableBundlesInPD(t *testing.T, do *domain.Domain, tt *meta.Meta, tblInfo *model.TableInfo, waitingGC bool) { + is := do.InfoSchema() bundle, err := placement.NewTableBundle(tt, tblInfo) require.NoError(t, err) - checks = append(checks, &struct { - ID string - tableID int64 - bundle *placement.Bundle - }{ID: placement.GroupID(tblInfo.ID), tableID: tblInfo.ID, bundle: bundle}) - - if tblInfo.Partition != nil { - for _, def := range tblInfo.Partition.Definitions { - bundle, err := placement.NewPartitionBundle(tt, def) - require.NoError(t, err) - checks = append(checks, &struct { - ID string - tableID int64 - bundle *placement.Bundle - }{ID: placement.GroupID(def.ID), tableID: def.ID, bundle: bundle}) - } + tblBundle := &bundleCheck{ + ID: placement.GroupID(tblInfo.ID), + tableID: tblInfo.ID, + bundle: bundle, + comment: fmt.Sprintf("tableName: %s, physicalID: %d", tblInfo.Name, tblInfo.ID), + waitingGC: waitingGC, } - - is := do.InfoSchema() - for _, check := range checks { - pdGot, err := infosync.GetRuleBundle(context.TODO(), check.ID) - require.NoError(t, err) - isGot, ok := is.PlacementBundleByPhysicalTableID(check.tableID) - if check.bundle == nil { - require.True(t, pdGot.IsEmpty(), "bundle should be nil for table: %d", check.tableID) - require.False(t, ok, "bundle should be nil for table: %d", check.tableID) - } else { - expectedJSON, err := json.Marshal(check.bundle) - require.NoError(t, err) - - pdGotJSON, err := json.Marshal(pdGot) - require.NoError(t, err) - require.NotNil(t, pdGot) - require.Equal(t, string(expectedJSON), string(pdGotJSON)) - - isGotJSON, err := json.Marshal(isGot) - require.NoError(t, err) - require.NotNil(t, isGot) - require.Equal(t, string(expectedJSON), string(isGotJSON)) - } + tblBundle.check(t, is) + if tblInfo.Partition != nil { + pars := tblInfo.Partition.Definitions + checkPartitionBundlesInPD(t, is, tt, pars, waitingGC) } } @@ -1651,6 +1690,17 @@ func TestAddPartitionWithPlacement(t *testing.T) { } func TestTruncateTableWithPlacement(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) + defer func(originGC bool) { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) + if originGC { + util.EmulatorGCEnable() + } else { + util.EmulatorGCDisable() + } + }(util.IsEmulatorGCEnable()) + util.EmulatorGCDisable() + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1682,6 +1732,8 @@ func TestTruncateTableWithPlacement(t *testing.T) { t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) + checkExistTableBundlesInPD(t, dom, "test", "t1") + tk.MustExec("TRUNCATE TABLE t1") tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + @@ -1690,6 +1742,8 @@ func TestTruncateTableWithPlacement(t *testing.T) { newT1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) require.True(t, newT1.Meta().ID != t1.Meta().ID) + checkExistTableBundlesInPD(t, dom, "test", "t1") + checkWaitingGCTableBundlesInPD(t, dom, t1.Meta()) // test for partitioned table tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) ( @@ -1711,6 +1765,7 @@ func TestTruncateTableWithPlacement(t *testing.T) { "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + " PARTITION `p2` VALUES LESS THAN (10000))")) + checkExistTableBundlesInPD(t, dom, "test", "tp") tk.MustExec("TRUNCATE TABLE tp") newTp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) @@ -1721,79 +1776,58 @@ func TestTruncateTableWithPlacement(t *testing.T) { for i := range []int{0, 1, 2} { require.True(t, newTp.Meta().Partition.Definitions[i].ID != tp.Meta().Partition.Definitions[i].ID) } -} -func TestTruncateTableGCWithPlacement(t *testing.T) { - // clearAllBundles(t) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) - defer func(originGC bool) { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) - if originGC { - util.EmulatorGCEnable() - } else { - util.EmulatorGCDisable() - } - }(util.IsEmulatorGCEnable()) - util.EmulatorGCDisable() - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t0,t1") - tk.MustExec("drop placement policy if exists p1") - tk.MustExec("drop placement policy if exists p2") - - tk.MustExec("create placement policy p1 primary_region='r0' regions='r0'") - defer tk.MustExec("drop placement policy if exists p1") - - tk.MustExec("create placement policy p2 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy if exists p2") - - tk.MustExec("create table t0 (id int)") - defer tk.MustExec("drop table if exists t0") - - tk.MustExec("create table t1 (id int) placement policy p1") - defer tk.MustExec("drop table if exists t1") - - tk.MustExec(`create table t2 (id int) placement policy p1 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p2, - PARTITION p1 VALUES LESS THAN (1000) - )`) - defer tk.MustExec("drop table if exists t2") - - tk.MustExec("truncate table t2") + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkWaitingGCTableBundlesInPD(t, dom, tp.Meta()) - is := dom.InfoSchema() - t1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + // do GC + bundle, err := infosync.GetRuleBundle(context.TODO(), placement.GroupID(t1.Meta().ID)) require.NoError(t, err) - t2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) - require.NoError(t, err) - - bundles, err := infosync.GetAllRuleBundles(context.TODO()) + require.False(t, bundle.IsEmpty()) + bundle, err = infosync.GetRuleBundle(context.TODO(), placement.GroupID(tp.Meta().ID)) require.NoError(t, err) - require.Equal(t, 5, len(bundles)) + require.False(t, bundle.IsEmpty()) + for _, def := range tp.Meta().Partition.Definitions { + bundle, err = infosync.GetRuleBundle(context.TODO(), placement.GroupID(def.ID)) + require.NoError(t, err) + if def.PlacementPolicyRef != nil { + require.False(t, bundle.IsEmpty()) + } else { + require.True(t, bundle.IsEmpty()) + } + } gcWorker, err := gcworker.NewMockGCWorker(store) require.NoError(t, err) require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) - bundles, err = infosync.GetAllRuleBundles(context.TODO()) + checkExistTableBundlesInPD(t, dom, "test", "t1") + checkExistTableBundlesInPD(t, dom, "test", "tp") + bundle, err = infosync.GetRuleBundle(context.TODO(), placement.GroupID(t1.Meta().ID)) require.NoError(t, err) - require.Equal(t, 3, len(bundles)) - bundlesMap := make(map[string]*placement.Bundle) - for _, bundle := range bundles { - bundlesMap[bundle.ID] = bundle + require.True(t, bundle.IsEmpty()) + bundle, err = infosync.GetRuleBundle(context.TODO(), placement.GroupID(tp.Meta().ID)) + require.NoError(t, err) + require.True(t, bundle.IsEmpty()) + for _, def := range tp.Meta().Partition.Definitions { + bundle, err = infosync.GetRuleBundle(context.TODO(), placement.GroupID(def.ID)) + require.NoError(t, err) + require.True(t, bundle.IsEmpty()) } - _, ok := bundlesMap[placement.GroupID(t1.Meta().ID)] - require.True(t, ok) - - _, ok = bundlesMap[placement.GroupID(t2.Meta().ID)] - require.True(t, ok) - - _, ok = bundlesMap[placement.GroupID(t2.Meta().Partition.Definitions[0].ID)] - require.True(t, ok) } func TestTruncateTablePartitionWithPlacement(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) + defer func(originGC bool) { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) + if originGC { + util.EmulatorGCEnable() + } else { + util.EmulatorGCDisable() + } + }(util.IsEmulatorGCEnable()) + util.EmulatorGCDisable() + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1833,13 +1867,27 @@ func TestTruncateTablePartitionWithPlacement(t *testing.T) { tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) + checkOldPartitions := make([]model.PartitionDefinition, 0, 2) + for _, p := range tp.Meta().Partition.Definitions { + switch p.Name.L { + case "p1": + checkOldPartitions = append(checkOldPartitions, p.Clone()) + case "p3": + p.PlacementPolicyRef = tp.Meta().PlacementPolicyRef + checkOldPartitions = append(checkOldPartitions, p.Clone()) + } + } + tk.MustExec("ALTER TABLE tp TRUNCATE partition p1,p3") newTp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tp.Meta().ID, newTp.Meta().ID) require.Equal(t, policy1.ID, newTp.Meta().PlacementPolicyRef.ID) + require.Equal(t, 4, len(newTp.Meta().Partition.Definitions)) + require.Nil(t, newTp.Meta().Partition.Definitions[0].PlacementPolicyRef) require.Equal(t, policy2.ID, newTp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID) require.Equal(t, policy3.ID, newTp.Meta().Partition.Definitions[2].PlacementPolicyRef.ID) + require.Nil(t, newTp.Meta().Partition.Definitions[3].PlacementPolicyRef) require.Equal(t, tp.Meta().Partition.Definitions[0].ID, newTp.Meta().Partition.Definitions[0].ID) require.True(t, newTp.Meta().Partition.Definitions[1].ID != tp.Meta().Partition.Definitions[1].ID) require.Equal(t, tp.Meta().Partition.Definitions[2].ID, newTp.Meta().Partition.Definitions[2].ID) @@ -1854,10 +1902,36 @@ func TestTruncateTablePartitionWithPlacement(t *testing.T) { " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p3` */,\n" + " PARTITION `p3` VALUES LESS THAN (100000))")) + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions) + + // add new partition will not override bundle waiting for GC + tk.MustExec("alter table tp add partition (partition p4 values less than(1000000))") + newTp2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + require.NoError(t, err) + require.Equal(t, 5, len(newTp2.Meta().Partition.Definitions)) + checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions) + + // do GC + for _, par := range checkOldPartitions { + bundle, err := infosync.GetRuleBundle(context.TODO(), placement.GroupID(par.ID)) + require.NoError(t, err) + require.False(t, bundle.IsEmpty()) + } + + gcWorker, err := gcworker.NewMockGCWorker(store) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + + checkExistTableBundlesInPD(t, dom, "test", "tp") + for _, par := range checkOldPartitions { + bundle, err := infosync.GetRuleBundle(context.TODO(), placement.GroupID(par.ID)) + require.NoError(t, err) + require.True(t, bundle.IsEmpty()) + } } -func TestTruncatePartitionGCWithPlacement(t *testing.T) { - // clearAllBundles(t) +func TestDropTableWithPlacement(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) defer func(originGC bool) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) @@ -1868,61 +1942,145 @@ func TestTruncatePartitionGCWithPlacement(t *testing.T) { } }(util.IsEmulatorGCEnable()) util.EmulatorGCDisable() + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + + tk.MustExec("drop table if exists tp") tk.MustExec("drop placement policy if exists p1") tk.MustExec("drop placement policy if exists p2") + tk.MustExec("drop placement policy if exists p3") - tk.MustExec("create placement policy p1 primary_region='r0' regions='r0'") - defer tk.MustExec("drop placement policy if exists p1") - - tk.MustExec("create placement policy p2 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy if exists p2") - - tk.MustExec("create table t0 (id int)") - defer tk.MustExec("drop table if exists t0") + tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") + defer tk.MustExec("drop placement policy p1") - tk.MustExec("create table t1 (id int) placement policy p1") - defer tk.MustExec("drop table if exists t1") + tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") + defer tk.MustExec("drop placement policy p2") - tk.MustExec(`create table t2 (id int) placement policy p1 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p2, - PARTITION p1 VALUES LESS THAN (1000) - )`) - defer tk.MustExec("drop table if exists t2") + tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") + defer tk.MustExec("drop placement policy p3") - tk.MustExec("alter table t2 truncate partition p0") + tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (1000) placement policy p2, + PARTITION p2 VALUES LESS THAN (10000) placement policy p3, + PARTITION p3 VALUES LESS THAN (100000) + );`) + defer tk.MustExec("drop table if exists tp") - is := dom.InfoSchema() - t1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) - t2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + checkExistTableBundlesInPD(t, dom, "test", "tp") + tk.MustExec("drop table tp") + checkWaitingGCTableBundlesInPD(t, dom, tp.Meta()) + + // do GC + gcWorker, err := gcworker.NewMockGCWorker(store) require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) bundles, err := infosync.GetAllRuleBundles(context.TODO()) require.NoError(t, err) - require.Equal(t, 4, len(bundles)) + require.Equal(t, 0, len(bundles)) +} - gcWorker, err := gcworker.NewMockGCWorker(store) +func TestDropPartitionWithPlacement(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) + defer func(originGC bool) { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) + if originGC { + util.EmulatorGCEnable() + } else { + util.EmulatorGCDisable() + } + }(util.IsEmulatorGCEnable()) + util.EmulatorGCDisable() + + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists tp") + tk.MustExec("drop placement policy if exists p1") + tk.MustExec("drop placement policy if exists p2") + tk.MustExec("drop placement policy if exists p3") + + tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") + defer tk.MustExec("drop placement policy p1") + + tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") + defer tk.MustExec("drop placement policy p2") + + tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") + defer tk.MustExec("drop placement policy p3") + + policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) + require.True(t, ok) + + policy3, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p3")) + require.True(t, ok) + + // test for partitioned table + tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (1000) placement policy p2, + PARTITION p2 VALUES LESS THAN (10000) placement policy p3, + PARTITION p3 VALUES LESS THAN (100000) + );`) + defer tk.MustExec("drop table tp") + + tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) - require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) - bundles, err = infosync.GetAllRuleBundles(context.TODO()) + checkOldPartitions := make([]model.PartitionDefinition, 0, 2) + for _, p := range tp.Meta().Partition.Definitions { + switch p.Name.L { + case "p1": + checkOldPartitions = append(checkOldPartitions, p.Clone()) + case "p3": + p.PlacementPolicyRef = tp.Meta().PlacementPolicyRef + checkOldPartitions = append(checkOldPartitions, p.Clone()) + } + } + + tk.MustExec("ALTER TABLE tp DROP partition p1,p3") + newTp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) - require.Equal(t, 3, len(bundles)) - bundlesMap := make(map[string]*placement.Bundle) - for _, bundle := range bundles { - bundlesMap[bundle.ID] = bundle + require.Equal(t, tp.Meta().ID, newTp.Meta().ID) + require.Equal(t, policy1.ID, newTp.Meta().PlacementPolicyRef.ID) + require.Equal(t, 2, len(newTp.Meta().Partition.Definitions)) + require.Nil(t, newTp.Meta().Partition.Definitions[0].PlacementPolicyRef) + require.Equal(t, policy3.ID, newTp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID) + require.Equal(t, tp.Meta().Partition.Definitions[0].ID, newTp.Meta().Partition.Definitions[0].ID) + require.True(t, newTp.Meta().Partition.Definitions[1].ID == tp.Meta().Partition.Definitions[2].ID) + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions) + + // add new partition will not override bundle waiting for GC + tk.MustExec("alter table tp add partition (partition p4 values less than(1000000))") + newTp2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + require.NoError(t, err) + require.Equal(t, 3, len(newTp2.Meta().Partition.Definitions)) + checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions) + + // do GC + for _, par := range checkOldPartitions { + bundle, err := infosync.GetRuleBundle(context.TODO(), placement.GroupID(par.ID)) + require.NoError(t, err) + require.False(t, bundle.IsEmpty()) } - _, ok := bundlesMap[placement.GroupID(t1.Meta().ID)] - require.True(t, ok) - _, ok = bundlesMap[placement.GroupID(t2.Meta().ID)] - require.True(t, ok) + gcWorker, err := gcworker.NewMockGCWorker(store) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) - _, ok = bundlesMap[placement.GroupID(t2.Meta().Partition.Definitions[0].ID)] - require.True(t, ok) + checkExistTableBundlesInPD(t, dom, "test", "tp") + for _, par := range checkOldPartitions { + bundle, err := infosync.GetRuleBundle(context.TODO(), placement.GroupID(par.ID)) + require.NoError(t, err) + require.True(t, bundle.IsEmpty()) + } } func TestExchangePartitionWithPlacement(t *testing.T) { diff --git a/expression/integration_test/integration_test.go b/expression/integration_test/integration_test.go index 91e2ad25cf56f..7f3537f53b56a 100644 --- a/expression/integration_test/integration_test.go +++ b/expression/integration_test/integration_test.go @@ -7943,6 +7943,6 @@ func TestIfFunctionWithNull(t *testing.T) { tk.MustExec("drop table if exists ordres;") tk.MustExec("CREATE TABLE orders (id bigint(20) unsigned NOT NULL ,account_id bigint(20) unsigned NOT NULL DEFAULT '0' ,loan bigint(20) unsigned NOT NULL DEFAULT '0' ,stage_num int(20) unsigned NOT NULL DEFAULT '0' ,apply_time bigint(20) unsigned NOT NULL DEFAULT '0' ,PRIMARY KEY (id) /*T![clustered_index] CLUSTERED */,KEY idx_orders_account_id (account_id),KEY idx_orders_apply_time (apply_time));") tk.MustExec("insert into orders values (20, 210802010000721168, 20000 , 2 , 1682484268727), (22, 210802010000721168, 35100 , 4 , 1650885615002);") - tk.MustQuery("select min(if(apply_to_now_days <= 30,loan,null)) as min, max(if(apply_to_now_days <= 720,loan,null)) as max from (select loan, datediff(from_unixtime(unix_timestamp() + 18000), from_unixtime(apply_time/1000 + 18000)) as apply_to_now_days from orders) t1;").Sort().Check( + tk.MustQuery("select min(if(apply_to_now_days <= 30,loan,null)) as min, max(if(apply_to_now_days <= 720,loan,null)) as max from (select loan, datediff(from_unixtime(unix_timestamp('2023-05-18 18:43:43') + 18000), from_unixtime(apply_time/1000 + 18000)) as apply_to_now_days from orders) t1;").Sort().Check( testkit.Rows("20000 35100")) } diff --git a/infoschema/builder.go b/infoschema/builder.go index 3240e1c5c5c49..7ac4b9528ab9a 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -262,6 +262,11 @@ func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.Schema return nil, errors.Trace(err) } + if diff.Type == model.ActionTruncateTable { + b.deleteBundle(b.is, diff.OldTableID) + b.markTableBundleShouldUpdate(diff.TableID) + } + for _, opt := range diff.AffectedOpts { if diff.Type == model.ActionTruncateTablePartition { // Reduce the impact on DML when executing partition DDL. eg. @@ -269,8 +274,6 @@ func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.Schema // the TRUNCATE operation of session 2 on partition 2 does not cause the operation of session 1 to fail. tblIDs = append(tblIDs, opt.OldTableID) b.markPartitionBundleShouldUpdate(opt.TableID) - } else { - b.markTableBundleShouldUpdate(opt.TableID) } b.deleteBundle(b.is, opt.OldTableID) } @@ -283,6 +286,7 @@ func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff return nil, errors.Trace(err) } + b.markTableBundleShouldUpdate(diff.TableID) for _, opt := range diff.AffectedOpts { b.deleteBundle(b.is, opt.OldTableID) }