Skip to content

Commit

Permalink
*: Add TTL to the self schema version on etcd (pingcap#3427)
Browse files Browse the repository at this point in the history
* *: add ddl ttl
  • Loading branch information
zimulala authored Jun 11, 2017
1 parent ff77554 commit 1837c09
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 16 deletions.
8 changes: 8 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (s *mockSchemaSyncer) UpdateSelfVersion(ctx goctx.Context, version int64) e
return nil
}

// Done implements SchemaSyncer.Done interface.
func (s *mockSchemaSyncer) Done() <-chan struct{} {
return make(chan struct{}, 1)
}

// Restart implements SchemaSyncer.Restart interface.
func (s *mockSchemaSyncer) Restart(_ goctx.Context) error { return nil }

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *mockSchemaSyncer) RemoveSelfVersionPath() error { return nil }

Expand Down
16 changes: 8 additions & 8 deletions ddl/owner_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ func (m *ownerManager) SetBgOwner(isOwner bool) {
}
}

// NewSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var NewSessionTTL = 60
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

func (m *ownerManager) newSession(ctx goctx.Context, retryCnt int) (*concurrency.Session, error) {
func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
var err error
var etcdSession *concurrency.Session
for i := 0; i < retryCnt; i++ {
etcdSession, err = concurrency.NewSession(m.etcdCli,
concurrency.WithTTL(NewSessionTTL), concurrency.WithContext(ctx))
etcdSession, err = concurrency.NewSession(etcdCli,
concurrency.WithTTL(ttl), concurrency.WithContext(ctx))
if err == nil {
break
}
Expand All @@ -133,11 +133,11 @@ func (m *ownerManager) newSession(ctx goctx.Context, retryCnt int) (*concurrency

// CampaignOwners implements OwnerManager.CampaignOwners interface.
func (m *ownerManager) CampaignOwners(ctx goctx.Context) error {
ddlSession, err := m.newSession(ctx, newSessionDefaultRetryCnt)
ddlSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
bgSession, err := m.newSession(ctx, newSessionDefaultRetryCnt)
bgSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -156,7 +156,7 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency.
select {
case <-etcdSession.Done():
log.Info("[ddl] %s etcd session is done, creates a new one", key)
etcdSession, err = m.newSession(ctx, newSessionRetryUnlimited)
etcdSession, err = newSession(ctx, m.etcdCli, newSessionRetryUnlimited, ManagerSessionTTL)
if err != nil {
log.Infof("[ddl] break %s campaign loop, err %v", key, err)
return
Expand Down
48 changes: 41 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/terror"
Expand All @@ -44,9 +45,14 @@ const (
checkVersInterval = 20 * time.Millisecond
)

// CheckVersFirstWaitTime is a waitting time before the owner checks all the servers of the schema version,
// and it's an exported variable for testing.
var CheckVersFirstWaitTime = 50 * time.Millisecond
var (
// CheckVersFirstWaitTime is a waitting time before the owner checks all the servers of the schema version,
// and it's an exported variable for testing.
CheckVersFirstWaitTime = 50 * time.Millisecond
// SyncerSessionTTL is the etcd session's TTL in seconds.
// and it's an exported variable for testing.
SyncerSessionTTL = 10 * 60
)

// SchemaSyncer is used to synchronize schema version between the DDL worker leader and followers through etcd.
type SchemaSyncer interface {
Expand All @@ -61,6 +67,10 @@ type SchemaSyncer interface {
OwnerUpdateGlobalVersion(ctx goctx.Context, version int64) error
// GlobalVersionCh gets the chan for watching global version.
GlobalVersionCh() clientv3.WatchChan
// Done() returns a channel that closes when the syncer is no longer being refreshed.
Done() <-chan struct{}
// Restart restarts the syncer when it's on longer being refreshed.
Restart(ctx goctx.Context) error
// OwnerCheckAllVersions checks whether all followers' schema version are equal to
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease.
// It returns until all servers' versions are equal to the latest version or the ctx is done.
Expand All @@ -70,6 +80,7 @@ type SchemaSyncer interface {
type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
globalVerCh clientv3.WatchChan
}

Expand All @@ -81,7 +92,8 @@ func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
}
}

func (s *schemaVersionSyncer) putKV(ctx goctx.Context, retryCnt int, key, val string) error {
func (s *schemaVersionSyncer) putKV(ctx goctx.Context, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
select {
Expand All @@ -91,7 +103,7 @@ func (s *schemaVersionSyncer) putKV(ctx goctx.Context, retryCnt int, key, val st
}

childCtx, cancel := goctx.WithTimeout(ctx, keyOpDefaultTimeout)
_, err = s.etcdCli.Put(childCtx, key, val)
_, err = s.etcdCli.Put(childCtx, key, val, opts...)
cancel()
if err == nil {
return nil
Expand All @@ -111,8 +123,29 @@ func (s *schemaVersionSyncer) Init(ctx goctx.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session, err = newSession(ctx, s.etcdCli, newSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
s.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
return s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion)
return s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
}

// Restart implements SchemaSyncer.Restart interface.
func (s *schemaVersionSyncer) Restart(ctx goctx.Context) error {
var err error
s.session, err = newSession(ctx, s.etcdCli, newSessionRetryUnlimited, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
return s.putKV(ctx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
}

// GlobalVersionCh implements SchemaSyncer.GlobalVersionCh interface.
Expand All @@ -123,7 +156,8 @@ func (s *schemaVersionSyncer) GlobalVersionCh() clientv3.WatchChan {
// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
func (s *schemaVersionSyncer) UpdateSelfVersion(ctx goctx.Context, version int64) error {
ver := strconv.FormatInt(version, 10)
return s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver)
return s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))
}

// OwnerUpdateGlobalVersion implements SchemaSyncer.OwnerUpdateGlobalVersion interface.
Expand Down
12 changes: 11 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
// TODO: Reset ticker or make interval longer.
ticker := time.NewTicker(lease / 2)
defer ticker.Stop()
syncer := do.ddl.SchemaSyncer()

for {
select {
Expand All @@ -310,11 +311,20 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
}
case <-do.ddl.SchemaSyncer().GlobalVersionCh():
case <-syncer.GlobalVersionCh():
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
}
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
do.SchemaValidator.Stop()
err := syncer.Restart(goctx.Background())
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
do.SchemaValidator.Restart()
case <-do.exit:
return
}
Expand Down
32 changes: 32 additions & 0 deletions domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package domain
import (
"sync"
"time"

"github.com/ngaut/log"
)

// SchemaValidator is the interface for checking the validity of schema version.
Expand All @@ -27,6 +29,10 @@ type SchemaValidator interface {
Check(txnTS uint64, schemaVer int64) bool
// Latest returns the latest schema version it knows, but not necessary a valid one.
Latest() int64
// Stop stops checking the valid of transaction.
Stop()
// Restart restarts the schema validator after it is stopped.
Restart()
}

type schemaValidator struct {
Expand All @@ -43,9 +49,30 @@ func newSchemaValidator(lease time.Duration) SchemaValidator {
}
}

func (s *schemaValidator) Stop() {
log.Info("the schema validator stops")
s.mux.Lock()
defer s.mux.Lock()
s.items = nil
s.latestSchemaVer = 0
}

func (s *schemaValidator) Restart() {
log.Info("the schema validator restarts")
s.mux.Lock()
defer s.mux.Lock()
s.items = make(map[int64]time.Time)
}

func (s *schemaValidator) Update(leaseGrantTS uint64, schemaVer int64) {
s.mux.Lock()

if s.items == nil {
s.mux.Unlock()
log.Infof("the schema validator stopped before updating")
return
}

s.latestSchemaVer = schemaVer
leaseGrantTime := extractPhysicalTime(leaseGrantTS)
leaseExpire := leaseGrantTime.Add(s.lease - time.Millisecond)
Expand All @@ -68,6 +95,11 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool {
s.mux.RLock()
defer s.mux.RUnlock()

if s.items == nil {
log.Infof("the schema validator stopped before checking")
return false
}

if s.lease == 0 {
return true
}
Expand Down

0 comments on commit 1837c09

Please sign in to comment.