Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add auto_random id cache for statement retrying and table recover #14711

Merged
merged 13 commits into from
Feb 19, 2020
Merged
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type DDL interface {
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
Expand Down Expand Up @@ -665,3 +665,13 @@ func (d *ddl) GetHook() Callback {

return d.mu.hook
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
}
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1561,7 +1562,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche
SchemaName: schema.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.CurAutoRandID,
tangenta marked this conversation as resolved.
Show resolved Hide resolved
recoverInfo.DropJobID, recoverInfo.SnapshotTS, recoverTableCheckFlagNone},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error {
tbInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoID, autoRandID, dropJobID, recoverTableCheckFlag int64
tangenta marked this conversation as resolved.
Show resolved Hide resolved
var snapshotTS uint64
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
err := job.DecodeArgs(tbInfo, &autoID, &autoRandID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
if err = job.DecodeArgs(tblInfo, &autoIncID, &autoRandID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -292,7 +292,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
40 changes: 31 additions & 9 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -372,27 +373,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if err != nil {
return err
}
autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDFromSnapshot(job *model.Job) (int64, error) {
func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
// Get table original autoID before table drop.
reafans marked this conversation as resolved.
Show resolved Hide resolved
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, err
return 0, 0, err
}
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return 0, errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoID, nil
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
Expand Down Expand Up @@ -511,12 +525,20 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return infoschema.ErrTableExists.GenWithStackByArgs("tableID:" + strconv.FormatInt(tblInfo.ID, 10))
}

autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

Expand Down
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,16 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

if !hasValue || d.IsNull() {
_, err := e.ctx.Txn(true)
if err != nil {
Expand All @@ -848,6 +858,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
retryInfo.AddAutoRandomID(autoRandomID)
} else {
recordID, err := getAutoRecordID(d, &c.FieldType, true)
if err != nil {
Expand All @@ -858,6 +869,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)
}

casted, err := table.CastValue(e.ctx, d, c.ToInfo())
Expand Down
6 changes: 3 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,17 @@ func (m *Meta) CreateTableOrView(dbID int64, tableInfo *model.TableInfo) error {

// CreateTableAndSetAutoID creates a table with tableInfo in database,
// and rebases the table autoID.
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoID int64) error {
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoIncID, autoRandID int64) error {
err := m.CreateTableOrView(dbID, tableInfo)
if err != nil {
return errors.Trace(err)
}
_, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoID)
_, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoIncID)
if err != nil {
return errors.Trace(err)
}
if tableInfo.AutoRandomBits > 0 {
_, err = m.txn.HInc(m.dbKey(dbID), m.autoRandomTableIDKey(tableInfo.ID), 0)
_, err = m.txn.HInc(m.dbKey(dbID), m.autoRandomTableIDKey(tableInfo.ID), autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *testSuite) TestMeta(c *C) {
ID: 3,
Name: model.NewCIStr("tbl3"),
}
err = t.CreateTableAndSetAutoID(1, tbInfo3, 123)
err = t.CreateTableAndSetAutoID(1, tbInfo3, 123, 0)
c.Assert(err, IsNil)
id, err := t.GetAutoTableID(1, tbInfo3.ID)
c.Assert(err, IsNil)
Expand Down
67 changes: 50 additions & 17 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,39 +60,72 @@ var (
type RetryInfo struct {
Retrying bool
DroppedPreparedStmtIDs []uint32
currRetryOff int
autoIncrementIDs []int64
autoIncrementIDs retryInfoAutoIDs
autoRandomIDs retryInfoAutoIDs
}

// Clean does some clean work.
func (r *RetryInfo) Clean() {
r.currRetryOff = 0
if len(r.autoIncrementIDs) > 0 {
r.autoIncrementIDs = r.autoIncrementIDs[:0]
}
r.autoIncrementIDs.clean()
r.autoRandomIDs.clean()

if len(r.DroppedPreparedStmtIDs) > 0 {
r.DroppedPreparedStmtIDs = r.DroppedPreparedStmtIDs[:0]
}
}

// AddAutoIncrementID adds id to AutoIncrementIDs.
func (r *RetryInfo) AddAutoIncrementID(id int64) {
r.autoIncrementIDs = append(r.autoIncrementIDs, id)
}

// ResetOffset resets the current retry offset.
func (r *RetryInfo) ResetOffset() {
r.currRetryOff = 0
r.autoIncrementIDs.resetOffset()
r.autoRandomIDs.resetOffset()
}

// AddAutoIncrementID adds id to autoIncrementIDs.
func (r *RetryInfo) AddAutoIncrementID(id int64) {
r.autoIncrementIDs.add(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should be directly r.autoRandomIDs.autoIDs = append(r.autoRandomIDs.autoIDs, id) here.
AddAutoIncrementID()->add()->append() may be too deep.

}

// GetCurrAutoIncrementID gets current AutoIncrementID.
// GetCurrAutoIncrementID gets current autoIncrementID.
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) {
if r.currRetryOff >= len(r.autoIncrementIDs) {
return 0, errCantGetValidID
return r.autoIncrementIDs.next()
}

// AddAutoRandomID adds id to autoRandomIDs.
func (r *RetryInfo) AddAutoRandomID(id int64) {
r.autoRandomIDs.add(id)
}

// GetCurrAutoRandomID gets current AutoRandomID.
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) {
return r.autoRandomIDs.next()
}

type retryInfoAutoIDs struct {
currentOffset int
autoIDs []int64
}

func (r *retryInfoAutoIDs) clean() {
r.currentOffset = 0
if len(r.autoIDs) > 0 {
r.autoIDs = r.autoIDs[:0]
}
id := r.autoIncrementIDs[r.currRetryOff]
r.currRetryOff++
}

func (r *retryInfoAutoIDs) add(id int64) {
r.autoIDs = append(r.autoIDs, id)
}

func (r *retryInfoAutoIDs) resetOffset() {
r.currentOffset = 0
}

func (r *retryInfoAutoIDs) next() (int64, error) {
if r.currentOffset >= len(r.autoIDs) {
return 0, errCantGetValidID
Copy link
Member

@bb7133 bb7133 Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message of errCantGetValidID is "cannot get valid auto-increment id in retry". I suggest to modify it to "Cannot get a valid auto-ID when retrying the statement", what do you think?

}
id := r.autoIDs[r.currentOffset]
r.currentOffset++
return id, nil
}

Expand Down