Skip to content

Commit

Permalink
*: Reorg part update table (#40714)
Browse files Browse the repository at this point in the history
close #38535
  • Loading branch information
mjonss authored Jan 20, 2023
1 parent 3eef6a0 commit 07a7465
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 11 deletions.
7 changes: 2 additions & 5 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return errors.Trace(err)
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
Expand All @@ -624,7 +623,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
func getBatchTasks(t table.PhysicalTable, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
Expand Down Expand Up @@ -654,13 +653,11 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
endKey = prefix.PrefixNext()
}

//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
task := &reorgBackfillTask{
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
physicalTable: phyTbl,
physicalTable: t,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
Expand Down
24 changes: 24 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4626,6 +4626,7 @@ func TestReorganizeRangePartition(t *testing.T) {
"12 12 21",
"23 23 32"))
tk.MustExec(`alter table t reorganize partition pMax into (partition p2 values less than (30), partition pMax values less than (MAXVALUE))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -4668,6 +4669,7 @@ func TestReorganizeRangePartition(t *testing.T) {
"12 12 21",
"23 23 32"))
tk.MustExec(`alter table t reorganize partition p2,pMax into (partition p2 values less than (35),partition p3 values less than (47), partition pMax values less than (MAXVALUE))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+
"1 1 1",
"12 12 21",
Expand Down Expand Up @@ -4702,6 +4704,7 @@ func TestReorganizeRangePartition(t *testing.T) {
tk.MustQuery(`select * from t partition (pMax)`).Sort().Check(testkit.Rows("" +
"56 56 65"))
tk.MustExec(`alter table t reorganize partition p0,p1 into (partition p1 values less than (20))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand All @@ -4725,6 +4728,7 @@ func TestReorganizeRangePartition(t *testing.T) {
"56 56 65"))
tk.MustExec(`alter table t drop index b`)
tk.MustExec(`alter table t drop index c`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -4766,6 +4770,7 @@ func TestReorganizeRangePartition(t *testing.T) {
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
// But allowed to change from MAXVALUE if no existing values is outside the new range!
tk.MustExec(`alter table t2 reorganize partition pMax into (partition p4 values less than (90))`)
tk.MustExec(`admin check table t2`)
tk.MustQuery(`show create table t2`).Check(testkit.Rows("" +
"t2 CREATE TABLE `t2` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand All @@ -4790,6 +4795,7 @@ func TestReorganizeListPartition(t *testing.T) {
` (partition p1 values in (12,23,51,14), partition p2 values in (24,63), partition p3 values in (45))`)
tk.MustExec(`insert into t values (12,"12",21), (24,"24",42),(51,"51",15),(23,"23",32),(63,"63",36),(45,"45",54)`)
tk.MustExec(`alter table t reorganize partition p1 into (partition p0 values in (12,51,13), partition p1 values in (23))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) DEFAULT NULL,\n" +
Expand All @@ -4806,6 +4812,7 @@ func TestReorganizeListPartition(t *testing.T) {
// Note: MySQL cannot reorganize two non-consecutive list partitions :)
// ERROR 1519 (HY000): When reorganizing a set of partitions they must be in consecutive order
tk.MustExec(`alter table t reorganize partition p1, p3 into (partition pa values in (45,23,15))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) NOT NULL,\n" +
Expand Down Expand Up @@ -4839,6 +4846,7 @@ func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
tk.MustExec(`admin check table t`)
}

func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
Expand Down Expand Up @@ -4955,6 +4963,7 @@ func TestReorgPartitionConcurrent(t *testing.T) {
// StateWriteOnly
wait <- true
tk.MustExec(`insert into t values (11, "11", 11),(12,"12",21)`)
tk.MustExec(`admin check table t`)
writeOnlyInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema()
require.Equal(t, int64(1), writeOnlyInfoSchema.SchemaMetaVersion()-deleteOnlyInfoSchema.SchemaMetaVersion())
deleteOnlyTbl, err := deleteOnlyInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t"))
Expand All @@ -4966,7 +4975,9 @@ func TestReorgPartitionConcurrent(t *testing.T) {
// If not DeleteOnly is working, then this would show up when reorg is done
tk.MustExec(`delete from t where a = 11`)
tk.MustExec(`update t set b = "12b", c = 12 where a = 12`)
tk.MustExec(`admin check table t`)
writeOnlyTbl.Meta().Partition = writeOnlyParts
tk.MustExec(`admin check table t`)
wait <- true

// StateWriteReorganization
Expand Down Expand Up @@ -5009,6 +5020,7 @@ func TestReorgPartitionConcurrent(t *testing.T) {
currPart := currTbl.Meta().Partition
currTbl.Meta().Partition = oldTbl.Meta().Partition
tk.MustQuery(`select * from t where b = "16"`).Sort().Check(testkit.Rows("16 16 16"))
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -5038,6 +5050,7 @@ func TestReorgPartitionConcurrent(t *testing.T) {
"14 14 14",
"15 15 15",
"16 16 16"))
tk.MustExec(`admin check table t`)
newInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema()
require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion())
oldTbl, err = deleteReorgInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t"))
Expand Down Expand Up @@ -5076,6 +5089,7 @@ func TestReorgPartitionConcurrent(t *testing.T) {
" PARTITION `p1a` VALUES LESS THAN (15),\n" +
" PARTITION `p1b` VALUES LESS THAN (20),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
tk.MustExec(`admin check table t`)
newTbl.Meta().Partition = newPart
syncOnChanged <- true
require.NoError(t, <-alterErr)
Expand Down Expand Up @@ -5116,12 +5130,14 @@ func TestReorgPartitionFailConcurrent(t *testing.T) {
wait <- true
tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`)
tk.MustGetErrCode(`insert into t values (11, "11", 11),(12,"duplicate PK 💥", 13)`, mysql.ErrDupEntry)
tk.MustExec(`admin check table t`)
wait <- true
require.NoError(t, <-alterErr)
tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+
"12 12 21",
"14 14 14",
"15 15 15"))
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -5164,16 +5180,19 @@ func TestReorgPartitionFailConcurrent(t *testing.T) {
require.Equal(t, 0, getNumRowsFromPartitionDefs(t, tk, tbl, tbl.Meta().Partition.AddingDefinitions))
tk.MustExec(`delete from t where a = 14`)
tk.MustExec(`insert into t values (13, "13", 31),(14,"14b",14),(16, "16",16)`)
tk.MustExec(`admin check table t`)
wait <- true
wait <- true
tbl, err = infoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t"))
require.NoError(t, err)
require.Equal(t, 5, getNumRowsFromPartitionDefs(t, tk, tbl, tbl.Meta().Partition.AddingDefinitions))
tk.MustExec(`delete from t where a = 15`)
tk.MustExec(`insert into t values (11, "11", 11),(15,"15b",15),(17, "17",17)`)
tk.MustExec(`admin check table t`)
wait <- true
require.NoError(t, <-alterErr)

tk.MustExec(`admin check table t`)
tk.MustQuery(`select * from t where a between 10 and 22`).Sort().Check(testkit.Rows(""+
"11 11 11",
"12 12 21",
Expand Down Expand Up @@ -5252,8 +5271,10 @@ func TestReorgPartitionFailInject(t *testing.T) {
wait <- true
tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`)
tk.MustGetErrCode(`insert into t values (11, "11", 11),(12,"duplicate PK 💥", 13)`, mysql.ErrDupEntry)
tk.MustExec(`admin check table t`)
wait <- true
require.NoError(t, <-alterErr)
tk.MustExec(`admin check table t`)
tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+
"12 12 21",
"14 14 14",
Expand Down Expand Up @@ -5291,6 +5312,7 @@ func TestReorgPartitionRollback(t *testing.T) {
// (partitions used during reorg, but was dropped)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`))
tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))")
tk.MustExec(`admin check table t`)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockUpdateVersionAndTableInfoErr"))
ctx := tk.Session()
is := domain.GetDomain(ctx).InfoSchema()
Expand All @@ -5303,6 +5325,7 @@ func TestReorgPartitionRollback(t *testing.T) {
require.NoError(t, err)
}()
tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))")
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -5343,6 +5366,7 @@ func TestReorgPartitionData(t *testing.T) {
tk.MustExec(`SET @@session.sql_mode = default`)
tk.MustExec(`alter table t reorganize partition p1M into (partition p0 values less than (1), partition p2M values less than (2000000))`)
tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("0 Zero value! 0 2022-02-30 00:00:00"))
tk.MustExec(`admin check table t`)
}

// TODO Test with/without PK, indexes, UK, virtual, virtual stored columns
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,8 @@ func getIntervalFromPolicy(policy []time.Duration, i int) (time.Duration, bool)

func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn:
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn,
model.ActionReorganizePartition:
return getIntervalFromPolicy(slowDDLIntervalPolicy, i)
case model.ActionCreateTable, model.ActionCreateSchema:
return getIntervalFromPolicy(fastDDLIntervalPolicy, i)
Expand Down
17 changes: 17 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
Expand Down Expand Up @@ -1360,6 +1361,22 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs)
}
}
case model.ActionReorganizePartition:
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
if droppedIDs, ok := job.CtxVars[0].([]int64); ok {
if addedIDs, ok := job.CtxVars[1].([]int64); ok {
// to use AffectedOpts we need both new and old to have the same length
maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
case model.ActionCreateTable:
diff.TableID = job.TableID
if len(job.Args) > 0 {
Expand Down
22 changes: 19 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -300,6 +301,10 @@ func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.Tabl
if progress > 1 {
progress = 1
}
logutil.BgLogger().Debug("[ddl] update progress",
zap.Float64("progress", progress),
zap.Int64("addedRowCount", addedRowCount),
zap.Int64("totalCount", totalCount))
}
switch reorgInfo.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
Expand Down Expand Up @@ -330,9 +335,20 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
if !ok {
return statistics.PseudoRowCount
}
// TODO: if Reorganize Partition, only select number of rows from the selected partitions!
sql := "select table_rows from information_schema.tables where tidb_table_id=%?;"
rows, _, err := executor.ExecRestrictedSQL(w.ctx, nil, sql, tblInfo.ID)
var rows []chunk.Row
if tblInfo.Partition != nil && len(tblInfo.Partition.DroppingDefinitions) > 0 {
// if Reorganize Partition, only select number of rows from the selected partitions!
defs := tblInfo.Partition.DroppingDefinitions
partIDs := make([]string, 0, len(defs))
for _, def := range defs {
partIDs = append(partIDs, strconv.FormatInt(def.ID, 10))
}
sql := "select sum(table_rows) from information_schema.partitions where tidb_partition_id in (%?);"
rows, _, err = executor.ExecRestrictedSQL(w.ctx, nil, sql, strings.Join(partIDs, ","))
} else {
sql := "select table_rows from information_schema.tables where tidb_table_id=%?;"
rows, _, err = executor.ExecRestrictedSQL(w.ctx, nil, sql, tblInfo.ID)
}
if err != nil {
return statistics.PseudoRowCount
}
Expand Down
27 changes: 25 additions & 2 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,14 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
case model.ActionTruncateTablePartition, model.ActionTruncateTable:
return b.applyTruncateTableOrPartition(m, diff)
case model.ActionDropTable, model.ActionDropTablePartition:
return b.applyDropTableOrParition(m, diff)
return b.applyDropTableOrPartition(m, diff)
case model.ActionRecoverTable:
return b.applyRecoverTable(m, diff)
case model.ActionCreateTables:
return b.applyCreateTables(m, diff)
case model.ActionReorganizePartition:
// TODO: How to test?
return b.applyReorganizePartition(m, diff)
case model.ActionFlashbackCluster:
return []int64{-1}, nil
default:
Expand Down Expand Up @@ -275,7 +278,7 @@ func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.Schema
return tblIDs, nil
}

func (b *Builder) applyDropTableOrParition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -287,6 +290,24 @@ func (b *Builder) applyDropTableOrParition(m *meta.Meta, diff *model.SchemaDiff)
return tblIDs, nil
}

// TODO: How to test this?
func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// Is this needed? Since there should be no difference more than partition changes?
tblIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
return nil, errors.Trace(err)
}
for _, opt := range diff.AffectedOpts {
if opt.OldTableID != 0 {
b.deleteBundle(b.is, opt.OldTableID)
}
if opt.TableID != 0 {
b.markTableBundleShouldUpdate(opt.TableID)
}
}
return tblIDs, nil
}

func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
Expand Down Expand Up @@ -696,6 +717,8 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
switch tp {
case model.ActionDropTablePartition:
case model.ActionTruncateTablePartition:
// ReorganizePartition handle the bundles in applyReorganizePartition
case model.ActionReorganizePartition:
default:
pi := tblInfo.GetPartitionInfo()
if pi != nil {
Expand Down
15 changes: 15 additions & 0 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
}
case model.ActionReorganizePartition:
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
}
// Update global stats, even though it should not have changed,
// the updated statistics from the newly reorganized partitions may be better
pruneMode := h.CurrentPruneMode()
if pruneMode == variable.Dynamic && t.PartInfo != nil {
if err := h.updateGlobalStats(t.TableInfo); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.updateStatsVersion()
}
Expand Down

0 comments on commit 07a7465

Please sign in to comment.