Skip to content

Commit

Permalink
util/admin: support admin cleanup index on the partition table (#17203)…
Browse files Browse the repository at this point in the history
… (#17405)

* cherry pick #17203 to release-4.0

Signed-off-by: sre-bot <sre-bot@pingcap.com>

* fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

Co-authored-by: crazycs <crazycs520@gmail.com>
  • Loading branch information
sre-bot and crazycs520 authored May 26, 2020
1 parent fd1b262 commit 82592df
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
43 changes: 37 additions & 6 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,9 @@ type CleanupIndexExec struct {
done bool
removeCnt uint64

index table.Index
table table.Table
index table.Index
table table.Table
physicalID int64

idxCols []*model.ColumnInfo
idxColFieldTypes []*types.FieldType
Expand Down Expand Up @@ -606,6 +607,34 @@ func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.done {
return nil
}
var err error
if tbl, ok := e.table.(table.PartitionedTable); ok {
pi := e.table.Meta().GetPartitionInfo()
for _, p := range pi.Definitions {
e.table = tbl.GetPartition(p.ID)
e.index = tables.GetWritableIndexByName(e.index.Meta().Name.L, e.table)
e.physicalID = p.ID
err = e.init()
if err != nil {
return err
}
err = e.cleanTableIndex(ctx)
if err != nil {
return err
}
}
} else {
err = e.cleanTableIndex(ctx)
if err != nil {
return err
}
}
e.done = true
req.AppendUint64(0, e.removeCnt)
return nil
}

func (e *CleanupIndexExec) cleanTableIndex(ctx context.Context) error {
for {
errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error {
err := e.fetchIndex(ctx, txn)
Expand Down Expand Up @@ -634,8 +663,6 @@ func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
delete(e.idxValues, k)
}
}
e.done = true
req.AppendUint64(0, e.removeCnt)
return nil
}

Expand All @@ -647,7 +674,7 @@ func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transactio
sc := e.ctx.GetSessionVars().StmtCtx
var builder distsql.RequestBuilder
ranges := ranger.FullRange()
kvReq, err := builder.SetIndexRanges(sc, e.table.Meta().ID, e.index.Meta().ID, ranges).
kvReq, err := builder.SetIndexRanges(sc, e.physicalID, e.index.Meta().ID, ranges).
SetDAGRequest(dagPB).
SetStartTS(txn.StartTS()).
SetKeepOrder(true).
Expand All @@ -672,6 +699,10 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
return e.init()
}

func (e *CleanupIndexExec) init() error {
e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize)
e.idxValues = make(map[int64][][]types.Datum, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
Expand Down Expand Up @@ -709,7 +740,7 @@ func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest,

func (e *CleanupIndexExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.Meta().ID,
TableId: e.physicalID,
IndexId: e.index.Meta().ID,
Columns: util.ColumnsToProto(e.idxCols, e.table.Meta().PKIsHandle),
}
Expand Down
79 changes: 79 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,85 @@ func (s *testSuite5) TestAdminCleanupIndex(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite5) TestAdminCleanupIndexForPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

getTable := func() table.Table {
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(err, IsNil)
return tbl
}

checkFunc := func(tbl table.Table, pid int64, idxValue, handle int) {
idxInfo2 := tbl.Meta().FindIndexByName("c2")
indexOpr2 := tables.NewIndex(pid, tbl.Meta(), idxInfo2)
idxInfo3 := tbl.Meta().FindIndexByName("c3")
indexOpr3 := tables.NewIndex(pid, tbl.Meta(), idxInfo3)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(idxValue), int64(handle))
c.Assert(err, IsNil)
_, err = indexOpr3.Create(s.ctx, txn, types.MakeDatums(idxValue), int64(handle))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)

r := tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)")
r.Check(testkit.Rows("4"))
r = tk.MustQuery("admin cleanup index admin_test c2")
r.Check(testkit.Rows("1"))
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)")
r.Check(testkit.Rows("3"))

r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c3)")
r.Check(testkit.Rows("4"))
r = tk.MustQuery("admin cleanup index admin_test c3")
r.Check(testkit.Rows("1"))
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c3)")
r.Check(testkit.Rows("3"))
tk.MustExec("admin check table admin_test")
}

// Test for hash partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, primary key (c2), unique index c2(c2), index c3(c3)) partition by hash(c2) partitions 3;")
tk.MustExec("insert admin_test (c2, c3) values (0, 0), (1, 1), (2, 2)")
r := tk.MustQuery("admin cleanup index admin_test c2")
r.Check(testkit.Rows("0"))
tbl := getTable()
pi := tbl.Meta().GetPartitionInfo()
c.Assert(pi, NotNil)
for i, p := range pi.Definitions {
checkFunc(tbl, p.ID, i+6, i+6)
}

// Test for range partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec(`create table admin_test (c1 int, c2 int, c3 int default 1, primary key (c2), unique index c2 (c2), index c3(c3)) PARTITION BY RANGE ( c2 ) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (MAXVALUE))`)
tk.MustExec("insert admin_test (c1, c2) values (0, 0), (6, 6), (12, 12)")
r = tk.MustQuery("admin cleanup index admin_test c2")
r.Check(testkit.Rows("0"))
tbl = getTable()
pi = tbl.Meta().GetPartitionInfo()
c.Assert(pi, NotNil)
for i, p := range pi.Definitions {
checkFunc(tbl, p.ID, i*6+1, i*6+1)
}
}

func (s *testSuite5) TestAdminCleanupIndexPKNotHandle(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (b *executorBuilder) buildCleanupIndex(v *plannercore.CleanupIndex) Executo
idxCols: buildCleanupIndexCols(tblInfo, index.Meta()),
index: index,
table: t,
physicalID: t.Meta().ID,
batchSize: 20000,
}
return e
Expand Down

0 comments on commit 82592df

Please sign in to comment.