Skip to content

Commit

Permalink
*: Do retry when schema is expired (pingcap#2094)
Browse files Browse the repository at this point in the history
*: Do retry when schema is expired
  • Loading branch information
zimulala authored Nov 28, 2016
1 parent 36f0f30 commit 5c9f073
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 49 deletions.
97 changes: 60 additions & 37 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,20 @@ type Domain struct {

// loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used
// infoschema version, if it is the same as the schema version at startTS, we don't need to reload again.
func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) error {
// It returns the latest schema version and an error.
func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (int64, error) {
snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS))
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
if usedSchemaVersion != 0 && usedSchemaVersion == latestSchemaVersion {
log.Debugf("[ddl] schema version is still %d, no need reload", usedSchemaVersion)
return nil
return latestSchemaVersion, nil
}
startTime := time.Now()
ok, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
Expand All @@ -70,22 +71,22 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
if ok {
log.Infof("[ddl] diff load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
return nil
return latestSchemaVersion, nil
}

schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}

newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, latestSchemaVersion)
if err != nil {
return errors.Trace(err)
return 0, errors.Trace(err)
}
log.Infof("[ddl] full load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
newISBuilder.Build()
return nil
return latestSchemaVersion, nil
}

func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) {
Expand Down Expand Up @@ -196,7 +197,7 @@ func (do *Domain) InfoSchema() infoschema.InfoSchema {
// GetSnapshotInfoSchema gets a snapshot information schema.
func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchema, error) {
snapHandle := do.infoHandle.EmptyClone()
err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS)
_, err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -282,6 +283,7 @@ func (do *Domain) Reload() error {
defer do.m.Unlock()

var err error
var latestSchemaVersion int64
for i := 0; i < loadRetryTimes; i++ {
startTime := time.Now()
var ver kv.Version
Expand All @@ -292,11 +294,12 @@ func (do *Domain) Reload() error {
if oldInfoSchema != nil {
schemaVersion = oldInfoSchema.SchemaMetaVersion()
}
err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
latestSchemaVersion, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
}
if err == nil {
atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
do.SchemaValidity.updateTimeInfo(startTime.UnixNano(), ver.Ver)
do.SchemaValidity.updateSchemaVersion(latestSchemaVersion)
sub := time.Since(startTime)
lease := do.DDL().GetLease()
if sub > lease && lease > 0 {
Expand Down Expand Up @@ -325,9 +328,9 @@ func (do *Domain) checkValidityInLoop(lease time.Duration) {
if sub > lease {
// If sub is greater than a lease,
// it means that the schema version hasn't update for a lease.
do.SchemaValidity.SetValidity(false, lastSuccTS)
do.SchemaValidity.SetExpireInfo(true, lastSuccTS)
} else {
do.SchemaValidity.SetValidity(true, lastSuccTS)
do.SchemaValidity.SetExpireInfo(false, lastSuccTS)
}

waitTime := lease
Expand Down Expand Up @@ -433,52 +436,72 @@ func (m *MockFailure) getValue() bool {
}

type schemaValidityInfo struct {
isValid bool
firstValidTS uint64 // It's used for recording the first txn TS of schema vaild.
mux sync.RWMutex
lastReloadTime int64 // It's used for recording the time of last reload schema.
lastSuccTS uint64 // It's used for recording the last txn TS of loading schema succeed.
mux sync.RWMutex
isExpired bool // Whether information schema is out of date.
recoveredTS uint64 // It's used for recording the first txn TS of schema vaild.
timeInfo struct {
mux sync.RWMutex
lastReloadTime int64 // It's used for recording the time of last reload schema.
lastSuccTS uint64 // It's used for recording the last txn TS of loading schema succeed.
}
lastSchemaVer int64 // It's used for recording the last schema version.
MockReloadFailed MockFailure // It mocks reload failed.
}

func (s *schemaValidityInfo) updateSchemaVersion(version int64) {
atomic.StoreInt64(&s.lastSchemaVer, version)
}

func (s *schemaValidityInfo) updateTimeInfo(lastReloadTime int64, lastSuccTS uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.timeInfo.mux.Lock()
defer s.timeInfo.mux.Unlock()

s.lastReloadTime = lastReloadTime
s.lastSuccTS = lastSuccTS
s.timeInfo.lastReloadTime = lastReloadTime
s.timeInfo.lastSuccTS = lastSuccTS
}

func (s *schemaValidityInfo) getTimeInfo() (int64, uint64) {
s.mux.RLock()
defer s.mux.RUnlock()
s.timeInfo.mux.Lock()
defer s.timeInfo.mux.Unlock()

return s.lastReloadTime, s.lastSuccTS
return s.timeInfo.lastReloadTime, s.timeInfo.lastSuccTS
}

// SetValidity sets the schema validity value.
// SetExpireInfo sets the information of whether information schema is out of date.
// It's public in order to do the test.
func (s *schemaValidityInfo) SetValidity(v bool, lastSuccTS uint64) {
func (s *schemaValidityInfo) SetExpireInfo(expired bool, lastSuccTS uint64) {
s.mux.Lock()
if s.isValid != v {
log.Infof("[ddl] SetValidity, original:%v current:%v lastSuccTS:%v", s.isValid, v, lastSuccTS)
if !v {
log.Errorf("[ddl] SetValidity, schema validity is %v, lastSuccTS:%v", v, lastSuccTS)
s.firstValidTS = lastSuccTS
if s.isExpired != expired {
log.Infof("[ddl] SetExpireInfo, original:%v current:%v lastSuccTS:%v", s.isExpired, expired, lastSuccTS)
if expired {
log.Errorf("[ddl] SetExpireInfo, information schema is expired %v, lastSuccTS:%v", expired, lastSuccTS)
s.recoveredTS = lastSuccTS
}
s.isValid = v
s.isExpired = expired
}
s.mux.Unlock()
}

func (s *schemaValidityInfo) Check(txnTS uint64) error {
// Check checks schema validity. It returns the current schema version and an error.
func (s *schemaValidityInfo) Check(txnTS uint64, schemaVer int64) (int64, error) {
currVer := atomic.LoadInt64(&s.lastSchemaVer)
s.mux.RLock()
if s.isValid && (txnTS == 0 || txnTS > s.firstValidTS) {
if s.isExpired {
s.mux.RUnlock()
return currVer, ErrLoadSchemaTimeOut.Gen("InfomationSchema is out of date.")
}

// txnTS != 0, it means the transition isn't nil.
// txnTS <= s.recoveredTS, it means the transition begins before schema is recovered.
// schemaVer != currVer, it means the schema version is changed.
if txnTS != 0 && txnTS <= s.recoveredTS && schemaVer != currVer {
s.mux.RUnlock()
return nil
log.Warnf("check schema validity, txnTS:%v recordTS:%v schema version original:%v input:%v",
txnTS, s.recoveredTS, currVer, schemaVer)
return currVer, ErrLoadSchemaTimeOut.Gen("InfomationSchema is out of date.")
}
s.mux.RUnlock()
return ErrLoadSchemaTimeOut.Gen("InfomationSchema is out of date.")
return currVer, nil
}

// NewDomain creates a new domain. Should not create multiple domains for the same store.
Expand All @@ -494,7 +517,7 @@ func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
if err = d.Reload(); err != nil {
return nil, errors.Trace(err)
}
d.SchemaValidity.SetValidity(true, 0)
d.SchemaValidity.SetExpireInfo(false, 0)

variable.RegisterStatistics(d)

Expand Down
14 changes: 11 additions & 3 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,28 @@ func (*testSuite) TestT(c *C) {
c.Assert(dom1.DDL().GetLease(), Equals, 0*time.Second)

// for schemaValidity
err = dom.SchemaValidity.Check(0)
schemaVer, err := dom.SchemaValidity.Check(0, 0)
c.Assert(err, IsNil)
dom.SchemaValidity.MockReloadFailed.SetValue(true)
err = dom.Reload()
c.Assert(err, NotNil)
time.Sleep(lease)
err = dom.SchemaValidity.Check(0)
_, err = dom.SchemaValidity.Check(0, 0)
c.Assert(err, NotNil)
_, err = dom.SchemaValidity.Check(0, schemaVer)
c.Assert(err, NotNil)
dom.SchemaValidity.MockReloadFailed.SetValue(false)
dom.SchemaValidity.SetExpireInfo(false, 0)
_, err = dom.SchemaValidity.Check(1, 0)
c.Assert(err, NotNil)
schemaVer1, err := dom.SchemaValidity.Check(0, schemaVer)
c.Assert(err, IsNil)
err = dom.Reload()
c.Assert(err, IsNil)
time.Sleep(lease)
err = dom.SchemaValidity.Check(0)
schemaVer2, err := dom.SchemaValidity.Check(0, 0)
c.Assert(err, IsNil)
c.Assert(schemaVer1, Equals, schemaVer2)

err = store.Close()
c.Assert(err, IsNil)
Expand Down
40 changes: 32 additions & 8 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ func (h *stmtHistory) clone() *stmtHistory {
const unlimitedRetryCnt = -1

type session struct {
txn kv.Transaction // Current transaction
values map[fmt.Stringer]interface{}
store kv.Storage
history stmtHistory
maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times.
txn kv.Transaction // current transaction
// It is the schema version in current transaction. If it's 0, the transaction is nil.
schemaVerInCurrTxn int64
values map[fmt.Stringer]interface{}
store kv.Storage
history stmtHistory
maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times.

debugInfos map[string]interface{} // Vars for debug and unit tests.

Expand All @@ -134,15 +136,37 @@ func (s *session) cleanRetryInfo() {
}
}

// TODO: Set them as system variables.
var (
checkSchemaValidityRetryTimes = 30
checkSchemaValiditySleepTime = 1 * time.Second
)

// If the schema is invalid, we need to rollback the current transaction.
func (s *session) checkSchemaValidOrRollback() error {
var ts uint64
if s.txn != nil {
ts = s.txn.StartTS()
} else {
s.schemaVerInCurrTxn = 0
}
err := sessionctx.GetDomain(s).SchemaValidity.Check(ts)
if err == nil {
return nil

var err error
var currSchemaVer int64
for i := 0; i < checkSchemaValidityRetryTimes; i++ {
currSchemaVer, err = sessionctx.GetDomain(s).SchemaValidity.Check(ts, s.schemaVerInCurrTxn)
if err == nil {
if s.txn == nil {
s.schemaVerInCurrTxn = currSchemaVer
}
return nil
}
log.Infof("schema version original %d, current %d, sleep time %v",
s.schemaVerInCurrTxn, currSchemaVer, checkSchemaValiditySleepTime)
if currSchemaVer != s.schemaVerInCurrTxn && s.schemaVerInCurrTxn != 0 {
break
}
time.Sleep(checkSchemaValiditySleepTime)
}

if err1 := s.RollbackTxn(); err1 != nil {
Expand Down
6 changes: 5 additions & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ func (s *testSessionSuite) SetUpSuite(c *C) {
s.dropTableSQL = `Drop TABLE if exists t;`
s.createTableSQL = `CREATE TABLE t(id TEXT);`
s.selectSQL = `SELECT * from t;`
checkSchemaValidityRetryTimes = 3
checkSchemaValiditySleepTime = 20 * time.Millisecond
runtime.GOMAXPROCS(runtime.NumCPU())
}

func (s *testSessionSuite) TearDownSuite(c *C) {
removeStore(c, s.dbName)
checkSchemaValidityRetryTimes = 30
checkSchemaValiditySleepTime = 1 * time.Second
}

func (s *testSessionSuite) TestPrepare(c *C) {
Expand Down Expand Up @@ -2013,7 +2017,7 @@ func (s *testSessionSuite) TestIssue1435(c *C) {
ver, err := store.CurrentVersion()
c.Assert(err, IsNil)
c.Assert(ver, NotNil)
sessionctx.GetDomain(ctx).SchemaValidity.SetValidity(true, ver.Ver)
sessionctx.GetDomain(ctx).SchemaValidity.SetExpireInfo(false, ver.Ver)
sessionctx.GetDomain(ctx).SchemaValidity.MockReloadFailed.SetValue(false)
time.Sleep(lease)
mustExecSQL(c, se, "drop table if exists t;")
Expand Down
1 change: 1 addition & 0 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
lease = schemaLease
}
err = util.RunWithRetry(defaultMaxRetries, retryInterval, func() (retry bool, err1 error) {
log.Infof("store %v new domain, lease %v", store.UUID(), lease)
d, err1 = domain.NewDomain(store, lease)
return true, errors.Trace(err1)
})
Expand Down
Loading

0 comments on commit 5c9f073

Please sign in to comment.