Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add auto_random id cache for statement retrying and table recover #14711

Merged
merged 13 commits into from
Feb 19, 2020
Merged
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type DDL interface {
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
Expand Down Expand Up @@ -668,3 +668,13 @@ func (d *ddl) GetHook() Callback {

return d.mu.hook
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
}
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1602,7 +1603,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche
SchemaName: schema.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error {
tbInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
err := job.DecodeArgs(tbInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 6 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
const checkFlagIndexInJobArgs = 4 // The index of `recoverTableCheckFlag` in job arg list.
if err = job.DecodeArgs(tblInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -251,9 +252,9 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
// none -> write only
// check GC enable and update flag.
if gcEnable {
job.Args[len(job.Args)-1] = recoverTableCheckFlagEnableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagEnableGC
} else {
job.Args[len(job.Args)-1] = recoverTableCheckFlagDisableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -292,7 +293,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
24 changes: 24 additions & 0 deletions ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
)

// SessionExecInGoroutine export for testing.
Expand Down Expand Up @@ -55,3 +59,23 @@ func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL [
}
}()
}

// ExtractAllTableHandles extracts all handles of a given table.
func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, error) {
dom := domain.GetDomain(se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName))
if err != nil {
return nil, err
}
err = se.NewTxn(context.Background())
if err != nil {
return nil, err
}
var allHandles []int64
err = tbl.IterRecords(se, tbl.FirstKey(), nil,
func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
return allHandles, err
}
42 changes: 32 additions & 10 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -372,27 +373,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if err != nil {
return err
}
autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDFromSnapshot(job *model.Job) (int64, error) {
// Get table original autoID before table drop.
func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
// Get table original autoIDs before table drop.
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, err
return 0, 0, err
}
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return 0, errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoID, nil
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
Expand Down Expand Up @@ -511,12 +525,20 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return infoschema.ErrTableExists.GenWithStackByArgs("tableID:" + strconv.FormatInt(tblInfo.ID, 10))
}

autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

Expand Down
19 changes: 3 additions & 16 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
ddltestutil "github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -701,16 +701,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
for i := 0; i < 100; i++ {
tk.MustExec("insert into t(b) values (?)", i)
}
dom := domain.GetDomain(tk.Se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test_auto_random_bits"), model.NewCIStr("t"))
c.Assert(err, IsNil)
c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
var allHandles []int64
// Iterate all the record. The order is not guaranteed.
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
allHandles, err := ddltestutil.ExtractAllTableHandles(tk.Se, "test_auto_random_bits", "t")
c.Assert(err, IsNil)
tk.MustExec("drop table t")

Expand All @@ -723,11 +714,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
}
c.Assert(allZero, IsFalse)
// Test non-shard-bits part of auto random id is monotonic increasing and continuous.
orderedHandles := make([]int64, len(allHandles))
for i, h := range allHandles {
orderedHandles[i] = h << 16 >> 16
}
sort.Slice(orderedHandles, func(i, j int) bool { return orderedHandles[i] < orderedHandles[j] })
orderedHandles := testutil.ConfigTestUtils.MaskSortHandles(allHandles, 15, mysql.TypeLonglong)
size := int64(len(allHandles))
for i := int64(1); i <= size; i++ {
c.Assert(i, Equals, orderedHandles[i-1])
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4526,7 +4526,7 @@ func (s *testRecoverTable) TestRecoverTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_recover")
tk.MustExec("use test_recover")
tk.MustExec("drop table if exists t_recover, t_recover2")
tk.MustExec("drop table if exists t_recover")
tk.MustExec("create table t_recover (a int);")
defer func(originGC bool) {
if originGC {
Expand Down
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,16 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

if !hasValue || d.IsNull() {
_, err := e.ctx.Txn(true)
if err != nil {
Expand All @@ -847,6 +857,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
retryInfo.AddAutoRandomID(autoRandomID)
} else {
recordID, err := getAutoRecordID(d, &c.FieldType, true)
if err != nil {
Expand All @@ -857,6 +868,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)
}

casted, err := table.CastValue(e.ctx, d, c.ToInfo())
Expand Down
Loading