diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 3e1da43cdf75f..d749177a3b269 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -3282,6 +3282,124 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } +func TestExchangePartitionMultiTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + + dbName := "ExchangeMultiTable" + tk1.MustExec(`create schema ` + dbName) + tk1.MustExec(`use ` + dbName) + tk1.MustExec(`CREATE TABLE t1 (a int)`) + tk1.MustExec(`CREATE TABLE t2 (a int)`) + tk1.MustExec(`CREATE TABLE tp (a int) partition by hash(a) partitions 3`) + tk1.MustExec(`insert into t1 values (0)`) + tk1.MustExec(`insert into t2 values (3)`) + tk1.MustExec(`insert into tp values (6)`) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use ` + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use ` + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use ` + dbName) + waitFor := func(col int, tableName, s string) { + for { + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use test`) + sql := `admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'` + res := tk4.MustQuery(sql).Rows() + if len(res) == 1 && res[0][col] == s { + break + } + time.Sleep(10 * time.Millisecond) + } + } + alterChan1 := make(chan error) + alterChan2 := make(chan error) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into tp values (1)`) + go func() { + alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`) + }() + waitFor(11, "t1", "running") + go func() { + alterChan2 <- tk2.ExecToErr(`alter table tp exchange partition p0 with table t2`) + }() + waitFor(11, "t2", "queueing") + tk3.MustExec(`rollback`) + require.NoError(t, <-alterChan1) + err := <-alterChan2 + tk3.MustQuery(`select * from t1`).Check(testkit.Rows("6")) + tk3.MustQuery(`select * from t2`).Check(testkit.Rows("0")) + tk3.MustQuery(`select * from tp`).Check(testkit.Rows("3")) + require.NoError(t, err) +} + +func TestExchangePartitionValidation(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + dbName := "ExchangeValidation" + tk.MustExec(`create schema ` + dbName) + tk.MustExec(`use ` + dbName) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name))`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`insert into t1 values ("2023-08-06","0001")`) +} + +func TestExchangePartitionPlacementPolicy(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create schema ExchangePartWithPolicy`) + tk.MustExec(`use ExchangePartWithPolicy`) + tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`) + tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule1"`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule2" + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`, + "[ddl:1736]Tables have different definitions") + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) +} + func TestExchangePartitionHook(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index fc0e16f91bcaa..52e50b65c4050 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4746,7 +4746,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name)) } - // NOTE: if nt is temporary table, it should be checked return nil } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 68b58e0d99679..2f8cd248bc48a 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1373,24 +1373,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: - var ( - ptSchemaID int64 - ptTableID int64 - partName string - withValidation bool - ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } diff.OldTableID = job.TableID - affects := make([]*model.AffectedOption, 1) - affects[0] = &model.AffectedOption{ - SchemaID: ptSchemaID, - TableID: ptTableID, - OldTableID: ptTableID, + diff.OldSchemaID = job.SchemaID + if job.SchemaState != model.StatePublic { + diff.TableID = job.TableID + diff.SchemaID = job.SchemaID + } else { + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 // Not needed, will reload the whole table + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + diff.SchemaID = ptSchemaID + diff.TableID = ptTableID } - diff.AffectedOpts = affects case model.ActionTruncateTablePartition: diff.TableID = job.TableID if len(job.CtxVars) > 0 { diff --git a/ddl/partition.go b/ddl/partition.go index dd516a190dac7..43210a2e7a50f 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2381,6 +2381,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if job.IsRollingback() { + return rollbackExchangeTablePartition(d, t, job, nt) + } pt, err := getTableInfo(t, ptID, ptSchemaID) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { @@ -2389,35 +2392,49 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if pt.State != model.StatePublic { - job.State = model.JobStateCancelled - return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) - } - - err = checkExchangePartition(pt, nt) + index, partDef, err := getPartitionDef(pt, partName) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StateNone { + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - err = checkTableDefCompatible(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - index, _, err := getPartitionDef(pt, partName) - if err != nil { - return ver, errors.Trace(err) - } - if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ - ExchangePartitionFlag: true, ExchangePartitionID: ptID, ExchangePartitionDefID: defID, } + // We need an interim schema version, + // so there are no non-matching rows inserted + // into the table using the schema version + // before the exchange is made. + job.SchemaState = model.StateWriteOnly return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } + // From now on, nt (the non-partitioned table) has + // ExchangePartitionInfo set, meaning it is restricted + // to only allow writes that would match the + // partition to be exchange with. + // So we need to rollback that change, instead of just cancelling. if d.lease > 0 { delayForAsyncCommit() @@ -2426,7 +2443,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { - job.State = model.JobStateCancelled + job.State = model.JobStateRollingback return ver, errors.Trace(err) } } @@ -2434,19 +2451,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } // non-partition table auto IDs. ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get() if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2459,35 +2468,32 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } - // exchange table meta id - partDef.ID, nt.ID = nt.ID, partDef.ID - - err = t.UpdateTable(ptSchemaID, pt) + // Recreate non-partition table meta info, + // by first delete it with the old table id + err = t.DropTableOrView(job.SchemaID, nt.ID) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } - failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { - if val.(bool) { - job.State = model.JobStateCancelled - failpoint.Return(ver, errors.New("occur an error after updating partition id")) - } - }) + // exchange table meta id + partDef.ID, nt.ID = nt.ID, partDef.ID - // recreate non-partition table meta info - err = t.DropTableOrView(job.SchemaID, partDef.ID) + err = t.UpdateTable(ptSchemaID, pt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.CreateTableOrView(job.SchemaID, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ver, errors.New("occur an error after updating partition id")) + } + }) + // Set both tables to the maximum auto IDs between normal table and partitioned table. newAutoIDs := meta.AutoIDGroup{ RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), @@ -2496,12 +2502,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2520,23 +2524,15 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } }) - err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) + bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -2545,7 +2541,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID}) if err != nil { - job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to get PD the label rules") } @@ -2572,10 +2567,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo patch := label.NewRulePatch(setRules, deleteRules) err = infosync.UpdateLabelRules(context.TODO(), patch) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") } + job.SchemaState = model.StatePublic nt.ExchangePartitionInfo = nil ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { @@ -3220,7 +3215,7 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) return nil } -func bundlesForExchangeTablePartition(t *meta.Meta, _ *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { +func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { bundles := make([]*placement.Bundle, 0, 3) ptBundle, err := placement.NewTableBundle(t, pt) @@ -3318,16 +3313,21 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { - if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { + partitionPPRef := partPPRef + if partitionPPRef == nil { + partitionPPRef = ptPPRef + } + + if ntPPRef == nil && partitionPPRef == nil { return nil } - if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + if ntPPRef == nil || partitionPPRef == nil { return dbterror.ErrTablesDifferentMetadata } - ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) - ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index f7a29c4e711e9..91c8ffb0dfbb5 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -2087,61 +2087,48 @@ func TestExchangePartitionWithPlacement(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) // clearAllBundles(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set @@tidb_enable_exchange_partition=1") tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2, tp") - tk.MustExec("drop placement policy if exists p1") - tk.MustExec("drop placement policy if exists p2") - tk.MustExec("drop placement policy if exists p3") - - tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy p1") - - tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") - defer tk.MustExec("drop placement policy p2") - tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") - defer tk.MustExec("drop placement policy p3") + tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1'") + tk.MustExec("create placement policy pp2 primary_region='r2' regions='r2'") + tk.MustExec("create placement policy pp3 primary_region='r3' regions='r3'") - policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) + policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("pp1")) require.True(t, ok) - tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) - defer tk.MustExec("drop table t1") - + tk.MustExec(`CREATE TABLE t1 (id INT) placement policy pp1`) tk.MustExec(`CREATE TABLE t2 (id INT)`) - defer tk.MustExec("drop table t2") + tk.MustExec(`CREATE TABLE t3 (id INT) placement policy pp3`) t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) t1ID := t1.Meta().ID - tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p1, - PARTITION p1 VALUES LESS THAN (1000) placement policy p2, - PARTITION p2 VALUES LESS THAN (10000) - );`) - defer tk.MustExec("drop table tp") + tk.MustExec(`CREATE TABLE tp (id INT) placement policy pp3 PARTITION BY RANGE (id) ( + PARTITION p1 VALUES LESS THAN (100) placement policy pp1, + PARTITION p2 VALUES LESS THAN (1000) placement policy pp2, + PARTITION p3 VALUES LESS THAN (10000) + )`) tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) tpID := tp.Meta().ID par0ID := tp.Meta().Partition.Definitions[0].ID - // exchange par0, t1 - tk.MustExec("alter table tp exchange partition p0 with table t1") + // exchange par1, t1 + tk.MustExec("alter table tp exchange partition p1 with table t1") tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */")) tk.MustQuery("show create table tp").Check(testkit.Rows("" + "tp CREATE TABLE `tp` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp3` */\n" + "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `p2` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p3` VALUES LESS THAN (10000))")) tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tpID, tp.Meta().ID) @@ -2153,11 +2140,31 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID) checkExistTableBundlesInPD(t, dom, "test", "tp") - // exchange par0, t2 - tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata) + // exchange par2, t1 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t1", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t1 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t1", mysql.ErrTablesDifferentMetadata) // exchange par1, t2 tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t2 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t2 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par1, t3 + tk.MustGetErrCode("alter table tp exchange partition p1 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t3 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t3 + tk.MustExec("alter table tp exchange partition p3 with table t3") + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkExistTableBundlesInPD(t, dom, "test", "t3") } func TestPDFail(t *testing.T) { diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 00da651d9df22..3af166b498f80 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -263,6 +263,30 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool { return false } +// rollbackExchangeTablePartition will clear the non-partitioned +// table's ExchangePartitionInfo state. +func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) { + tblInfo.ExchangePartitionInfo = nil + job.State = model.JobStateRollbackDone + job.SchemaState = model.StatePublic + return updateVersionAndTableInfo(d, t, job, tblInfo, true) +} + +func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateNone { + // Nothing is changed + job.State = model.JobStateCancelled + return ver, dbterror.ErrCancelledDDLJob + } + var nt *model.TableInfo + nt, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + ver, err = rollbackExchangeTablePartition(d, t, job, nt) + return ver, errors.Trace(err) +} + func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) @@ -377,6 +401,7 @@ func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. + // TODO: Test this with reorganize partition p1 into (partition p1 ...)! return convertAddTablePartitionJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob, tblInfo) } @@ -409,6 +434,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) err = rollingbackDropTableOrView(t, job) case model.ActionDropTablePartition: ver, err = rollingbackDropTablePartition(t, job) + case model.ActionExchangeTablePartition: + ver, err = rollingbackExchangeTablePartition(d, t, job) case model.ActionDropSchema: err = rollingbackDropSchema(t, job) case model.ActionRenameIndex: @@ -424,7 +451,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableCharsetAndCollate, model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, - model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement, + model.ActionModifySchemaDefaultPlacement, model.ActionRecoverSchema, model.ActionAlterCheckConstraint: ver, err = cancelOnlyNotHandledJob(job, model.StateNone) case model.ActionMultiSchemaChange: diff --git a/ddl/tests/fail/fail_db_test.go b/ddl/tests/fail/fail_db_test.go index 8a0c4c41e9ecd..3fdcda0dddc75 100644 --- a/ddl/tests/fail/fail_db_test.go +++ b/ddl/tests/fail/fail_db_test.go @@ -135,8 +135,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustExec("insert into pt values(1), (3), (5)") tk.MustExec("create table nt(a int)") tk.MustExec("insert into nt values(7)") - tk.MustExec("set @@tidb_enable_exchange_partition=1") - defer tk.MustExec("set @@tidb_enable_exchange_partition=0") err = tk.ExecToErr("alter table pt exchange partition p1 with table nt") require.Error(t, err) diff --git a/executor/insert_common.go b/executor/insert_common.go index 1625941b7b1cd..e8e685cbe7c04 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -692,7 +692,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/executor/write.go b/executor/write.go index 9526c7d2e62d0..4b789162ac264 100644 --- a/executor/write.go +++ b/executor/write.go @@ -79,7 +79,7 @@ func updateRecord( // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/infoschema/builder.go b/infoschema/builder.go index 7ac4b9528ab9a..e2c78387eb9bf 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -227,6 +227,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyCreateTables(m, diff) case model.ActionReorganizePartition: return b.applyReorganizePartition(m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangeTablePartition(m, diff) case model.ActionFlashbackCluster: return []int64{-1}, nil default: @@ -309,6 +311,47 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + // The partitioned table is not affected until the last stage + if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { + return b.applyTableUpdate(m, diff) + } + ntSchemaID := diff.OldSchemaID + ntID := diff.OldTableID + ptSchemaID := diff.SchemaID + ptID := diff.TableID + if len(diff.AffectedOpts) > 0 { + // From old version + ptID = diff.AffectedOpts[0].TableID + ptSchemaID = diff.AffectedOpts[0].SchemaID + } + // The normal table needs to be updated first: + // Just update the tables separately + currDiff := &model.SchemaDiff{ + Version: diff.Version, + TableID: ntID, + SchemaID: ntSchemaID, + } + ntIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markPartitionBundleShouldUpdate(ntID) + // Then the partitioned table + currDiff.TableID = ptID + currDiff.SchemaID = ptSchemaID + ptIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markTableBundleShouldUpdate(ptID) + err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil +} + func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -376,14 +419,6 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) - - if diff.Type == model.ActionExchangeTablePartition { - // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) - if err != nil { - return nil, errors.Trace(err) - } - } } return tblIDs, nil @@ -415,7 +450,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: + case model.ActionTruncateTable, model.ActionCreateView: oldTableID = diff.OldTableID newTableID = diff.TableID default: @@ -433,8 +468,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 b.markTableBundleShouldUpdate(newTableID) case model.ActionRecoverTable: b.markTableBundleShouldUpdate(newTableID) - case model.ActionExchangeTablePartition: - b.markPartitionBundleShouldUpdate(newTableID) case model.ActionAlterTablePlacement: b.markTableBundleShouldUpdate(newTableID) } @@ -445,7 +478,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 var allocs autoid.Allocators if tableIDIsValid(oldTableID) { if oldTableID == newTableID && (diff.Type != model.ActionRenameTable && diff.Type != model.ActionRenameTables) && - diff.Type != model.ActionExchangeTablePartition && // For repairing table in TiDB cluster, given 2 normal node and 1 repair node. // For normal node's information schema, repaired table is existed. // For repair node's information schema, repaired table is filtered (couldn't find it in `is`). diff --git a/parser/model/model.go b/parser/model/model.go index 846817b034632..0059cc60e05bf 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1142,9 +1142,10 @@ func (p PartitionType) String() string { // ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { - ExchangePartitionFlag bool `json:"exchange_partition_flag"` ExchangePartitionID int64 `json:"exchange_partition_id"` ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` + // Deprecated, not used + XXXExchangePartitionFlag bool `json:"exchange_partition_flag"` } // PartitionInfo provides table partition info. diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index 4616d489dacf5..199a331fda358 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -709,6 +709,113 @@ func TestIssue31629(t *testing.T) { } } +func TestExchangePartitionStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "partSchemaVer" + tk.MustExec("create database " + dbName) + tk.MustExec("use " + dbName) + tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use " + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use " + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use " + dbName) + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b))`) + tk.MustExec(`create table tp (a int primary key, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000))`) + tk.MustExec(`insert into t values (1, "1")`) + tk.MustExec(`insert into tp values (2, "2")`) + tk.MustExec(`analyze table t,tp`) + tk.MustExec("BEGIN") + tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2")) + alterChan := make(chan error) + go func() { + // WITH VALIDATION is the default + err := tk2.ExecToErr(`alter table tp exchange partition p0 with table t`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + gotime.Sleep(50 * gotime.Millisecond) + } + } + waitFor("t", "write only", 4) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t values (4,"4")`) + tk3.MustContainErrMsg(`insert into t values (1000004,"1000004")`, "[table:1748]Found a row not matching the given partition set") + tk.MustExec(`insert into t values (5,"5")`) + // This should fail the alter table! + tk.MustExec(`insert into t values (1000005,"1000005")`) + + // MDL will block the alter to not continue until all clients + // are in StateWriteOnly, which tk is blocking until it commits + tk.MustExec(`COMMIT`) + waitFor("t", "rollback done", 11) + // MDL will block the alter from finish, tk is in 'rollbacked' schema version + // but the alter is still waiting for tk3 to commit, before continuing + tk.MustExec("BEGIN") + tk.MustExec(`insert into t values (1000006,"1000006")`) + tk.MustExec(`insert into t values (6,"6")`) + tk3.MustExec(`insert into t values (7,"7")`) + tk3.MustContainErrMsg(`insert into t values (1000007,"1000007")`, + "[table:1748]Found a row not matching the given partition set") + tk3.MustExec("COMMIT") + require.ErrorContains(t, <-alterChan, + "[ddl:1737]Found a row that does not match the partition") + tk3.MustExec(`BEGIN`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "1000006 1000006", "5 5", "6 6")) + tk.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "4 4", "5 5", "7 7")) + tk3.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk.MustContainErrMsg(`insert into t values (7,"7")`, + "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + tk.MustExec(`insert into t values (8,"8")`) + tk.MustExec(`insert into t values (1000008,"1000008")`) + tk.MustExec(`insert into tp values (9,"9")`) + tk.MustExec(`insert into tp values (1000009,"1000009")`) + tk3.MustExec(`insert into t values (10,"10")`) + tk3.MustExec(`insert into t values (1000010,"1000010")`) + + tk3.MustExec(`COMMIT`) + tk.MustQuery(`show create table tp`).Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + + " PARTITION `p1M` VALUES LESS THAN (2000000))")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec(`commit`) + tk.MustExec(`insert into t values (11,"11")`) + tk.MustExec(`insert into t values (1000011,"1000011")`) + tk.MustExec(`insert into tp values (12,"12")`) + tk.MustExec(`insert into tp values (1000012,"1000012")`) +} + func TestAddKeyPartitionStates(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store)