Skip to content

Commit

Permalink
ddl: use the correct schema version when waitSchemaSynced (#37210)
Browse files Browse the repository at this point in the history
close #37209
  • Loading branch information
wjhuang2016 authored Aug 19, 2022
1 parent b690f1c commit 41b9e26
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 68 deletions.
14 changes: 14 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,3 +1714,17 @@ func TestBuildMaxLengthIndexWithNonRestrictedSqlMode(t *testing.T) {
}
}
}

func TestTiDBDownBeforeUpdateGlobalVersion(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion", `return(true)`))
tk.MustExec("alter table t add column b int")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion"))
}
42 changes: 35 additions & 7 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
ddlWorkerID = atomicutil.NewInt32(0)
// WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccurred = int64(1 * time.Second)

mockDDLErrOnce = int64(0)
)

// GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors.
Expand Down Expand Up @@ -782,6 +784,15 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) error {
time.Sleep(GetWaitTimeWhenErrorOccurred())
}

failpoint.Inject("mockDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce == 0 {
mockDDLErrOnce = schemaVer
failpoint.Return(errors.New("mock for ddl down"))
}
}
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
Expand Down Expand Up @@ -875,9 +886,11 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

if once {
w.waitSchemaSynced(d, job, waitTime)
once = false
return nil
err = w.waitSchemaSynced(d, job, waitTime)
if err == nil {
once = false
}
return err
}

if job.IsDone() || job.IsRollbackDone() {
Expand Down Expand Up @@ -1299,19 +1312,34 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
// but in this case we don't wait enough 2 * lease time to let other servers update the schema.
// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version
// in a cluster, or to wait for 2 * lease time.
func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) {
func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
ctx, cancelFunc := context.WithTimeout(w.ctx, waitTime)
defer cancelFunc()

latestSchemaVersion, err := d.schemaSyncer.MustGetGlobalVersion(ctx)
ver, _ := w.store.CurrentVersion(kv.GlobalTxnScope)
snapshot := w.store.GetSnapshot(ver)
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
logutil.Logger(w.logCtx).Warn("[ddl] get global version failed", zap.Error(err))
return
return err
}

failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job)
return nil
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
10 changes: 8 additions & 2 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// we should wait 2 * d.lease time to guarantee all TiDB server have finished the schema change.
// see waitSchemaSynced for more details.
if !d.isSynced(job) || d.once.Load() {
wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
d.once.Store(false)
err := wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
if err == nil {
d.once.Store(false)
} else {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
}
if err := wk.HandleDDLJobTable(d.ddlCtx, job); err != nil {
logutil.BgLogger().Info("[ddl] handle ddl job failed", zap.Error(err), zap.String("job", job.String()))
Expand Down
5 changes: 0 additions & 5 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ func (s *MockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version
return nil
}

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *MockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
return 0, nil
}

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
ticker := time.NewTicker(mockCheckVersInterval)
Expand Down
43 changes: 0 additions & 43 deletions ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ type SchemaSyncer interface {
GlobalVersionCh() clientv3.WatchChan
// WatchGlobalSchemaVer watches the global schema version.
WatchGlobalSchemaVer(ctx context.Context)
// MustGetGlobalVersion gets the global version. The only reason it fails is that ctx is done.
MustGetGlobalVersion(ctx context.Context) (int64, error)
// Done returns a channel that closes when the syncer is no longer being refreshed.
Done() <-chan struct{}
// Restart restarts the syncer when it's on longer being refreshed.
Expand Down Expand Up @@ -236,47 +234,6 @@ func (s *schemaVersionSyncer) removeSelfVersionPath() error {
return errors.Trace(err)
}

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
startTime := time.Now()
var (
err error
ver int
resp *clientv3.GetResponse
)
failedCnt := 0
intervalCnt := int(time.Second / util.KeyOpRetryInterval)

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if err != nil {
if failedCnt%intervalCnt == 0 {
logutil.BgLogger().Info("[ddl] syncer get global version failed", zap.Error(err))
}
time.Sleep(util.KeyOpRetryInterval)
failedCnt++
}

if util.IsContextDone(ctx) {
err = errors.Trace(ctx.Err())
return 0, err
}

resp, err = s.etcdCli.Get(ctx, util.DDLGlobalSchemaVersion)
if err != nil {
continue
}
if len(resp.Kvs) > 0 {
ver, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err == nil {
return int64(ver), nil
}
}
}
}

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
startTime := time.Now()
Expand Down
11 changes: 1 addition & 10 deletions ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ func TestSyncerSimple(t *testing.T) {

key := util2.DDLAllSchemaVersions + "/" + d.OwnerManager().ID()
checkRespKV(t, 1, key, InitialVersion, resp.Kvs...)
// for MustGetGlobalVersion function
globalVer, err := d.SchemaSyncer().MustGetGlobalVersion(ctx)
require.NoError(t, err)
require.Equal(t, InitialVersion, fmt.Sprintf("%d", globalVer))

childCtx, cancel := context.WithTimeout(ctx, minInterval)
defer cancel()
_, err = d.SchemaSyncer().MustGetGlobalVersion(childCtx)
require.True(t, isTimeoutError(err))

ic2 := infoschema.NewCache(2)
ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0)
Expand Down Expand Up @@ -138,7 +129,7 @@ func TestSyncerSimple(t *testing.T) {
require.Equal(t, "", checkErr)

// for CheckAllVersions
childCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond)
childCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
require.Error(t, d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer))
cancel()

Expand Down
1 change: 0 additions & 1 deletion metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ var (
}, []string{LblResult})

OwnerUpdateGlobalVersion = "update_global_version"
OwnerGetGlobalVersion = "get_global_version"
OwnerCheckAllVersions = "check_all_versions"
OwnerHandleSyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down

0 comments on commit 41b9e26

Please sign in to comment.