Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add auto_random id cache for statement retrying and table recover (#14711) #15393

Merged
merged 3 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type DDL interface {
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
Expand Down Expand Up @@ -659,6 +659,16 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
}

// DDL error codes.
const (
codeInvalidWorker terror.ErrCode = 1
Expand Down
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,8 +1519,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1539,7 +1540,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche
TableID: tbInfo.ID,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error {
tbInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
err := job.DecodeArgs(tbInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 6 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
const checkFlagIndexInJobArgs = 4 // The index of `recoverTableCheckFlag` in job arg list.
if err = job.DecodeArgs(tblInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -236,9 +237,9 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
// none -> write only
// check GC enable and update flag.
if gcEnable {
job.Args[len(job.Args)-1] = recoverTableCheckFlagEnableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagEnableGC
} else {
job.Args[len(job.Args)-1] = recoverTableCheckFlagDisableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -271,7 +272,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
24 changes: 24 additions & 0 deletions ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
)

// SessionExecInGoroutine export for testing.
Expand Down Expand Up @@ -55,3 +59,23 @@ func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL [
}
}()
}

// ExtractAllTableHandles extracts all handles of a given table.
func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, error) {
dom := domain.GetDomain(se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName))
if err != nil {
return nil, err
}
err = se.NewTxn(context.Background())
if err != nil {
return nil, err
}
var allHandles []int64
err = tbl.IterRecords(se, tbl.FirstKey(), nil,
func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
return allHandles, err
}
37 changes: 30 additions & 7 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -332,20 +333,42 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if err != nil {
return err
}
// Get table original autoID before table drop.
m, err := dom.GetSnapshotMeta(job.StartTS)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
// Get table original autoIDs before table drop.
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, 0, err
}
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
}
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down
19 changes: 3 additions & 16 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
ddltestutil "github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -761,16 +761,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
for i := 0; i < 100; i++ {
tk.MustExec("insert into t(b) values (?)", i)
}
dom := domain.GetDomain(tk.Se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test_auto_random_bits"), model.NewCIStr("t"))
c.Assert(err, IsNil)
c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
var allHandles []int64
// Iterate all the record. The order is not guaranteed.
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
allHandles, err := ddltestutil.ExtractAllTableHandles(tk.Se, "test_auto_random_bits", "t")
c.Assert(err, IsNil)
tk.MustExec("drop table t")

Expand All @@ -783,11 +774,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
}
c.Assert(allZero, IsFalse)
// Test non-shard-bits part of auto random id is monotonic increasing and continuous.
orderedHandles := make([]int64, len(allHandles))
for i, h := range allHandles {
orderedHandles[i] = h << 16 >> 16
}
sort.Slice(orderedHandles, func(i, j int) bool { return orderedHandles[i] < orderedHandles[j] })
orderedHandles := testutil.ConfigTestUtils.MaskSortHandles(allHandles, 15, mysql.TypeLonglong)
size := int64(len(allHandles))
for i := int64(1); i <= size; i++ {
c.Assert(i, Equals, orderedHandles[i-1])
Expand Down
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

var err error
var recordID int64
if !hasValue {
Expand All @@ -736,6 +746,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)
return d, nil
}

Expand All @@ -758,6 +769,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)

casted, err := table.CastValue(e.ctx, d, c.ToInfo())
if err != nil {
Expand Down
Loading