From 41b9e26d1ef468e079937e1e2aab33d411f28841 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Fri, 19 Aug 2022 17:18:52 +0800 Subject: [PATCH] ddl: use the correct schema version when waitSchemaSynced (#37210) close pingcap/tidb#37209 --- ddl/db_test.go | 14 +++++++++++++ ddl/ddl_worker.go | 42 +++++++++++++++++++++++++++++++------- ddl/job_table.go | 10 +++++++-- ddl/mock.go | 5 ----- ddl/syncer/syncer.go | 43 --------------------------------------- ddl/syncer/syncer_test.go | 11 +--------- metrics/ddl.go | 1 - 7 files changed, 58 insertions(+), 68 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index a324f1bdf0302..f4ac4d9f592f5 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1714,3 +1714,17 @@ func TestBuildMaxLengthIndexWithNonRestrictedSqlMode(t *testing.T) { } } } + +func TestTiDBDownBeforeUpdateGlobalVersion(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion", `return(true)`)) + tk.MustExec("alter table t add column b int") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion")) +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index cfb2c9b0bfbff..e6bdfb2717820 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -54,6 +54,8 @@ var ( ddlWorkerID = atomicutil.NewInt32(0) // WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors. WaitTimeWhenErrorOccurred = int64(1 * time.Second) + + mockDDLErrOnce = int64(0) ) // GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors. @@ -782,6 +784,15 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) error { time.Sleep(GetWaitTimeWhenErrorOccurred()) } + failpoint.Inject("mockDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce == 0 { + mockDDLErrOnce = schemaVer + failpoint.Return(errors.New("mock for ddl down")) + } + } + }) + // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update // the newest schema. @@ -875,9 +886,11 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { } if once { - w.waitSchemaSynced(d, job, waitTime) - once = false - return nil + err = w.waitSchemaSynced(d, job, waitTime) + if err == nil { + once = false + } + return err } if job.IsDone() || job.IsRollbackDone() { @@ -1299,19 +1312,34 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time // but in this case we don't wait enough 2 * lease time to let other servers update the schema. // So here we get the latest schema version to make sure all servers' schema version update to the latest schema version // in a cluster, or to wait for 2 * lease time. -func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) { +func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { - return + return nil } ctx, cancelFunc := context.WithTimeout(w.ctx, waitTime) defer cancelFunc() - latestSchemaVersion, err := d.schemaSyncer.MustGetGlobalVersion(ctx) + ver, _ := w.store.CurrentVersion(kv.GlobalTxnScope) + snapshot := w.store.GetSnapshot(ver) + m := meta.NewSnapshotMeta(snapshot) + latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff() if err != nil { logutil.Logger(w.logCtx).Warn("[ddl] get global version failed", zap.Error(err)) - return + return err } + + failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { + panic("check down before update global version failed") + } else { + mockDDLErrOnce = -1 + } + } + }) + w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job) + return nil } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { diff --git a/ddl/job_table.go b/ddl/job_table.go index 030b0bae62c99..33e1f8773c661 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -230,8 +230,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { // we should wait 2 * d.lease time to guarantee all TiDB server have finished the schema change. // see waitSchemaSynced for more details. if !d.isSynced(job) || d.once.Load() { - wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease) - d.once.Store(false) + err := wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease) + if err == nil { + d.once.Store(false) + } else { + logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String())) + time.Sleep(time.Second) + return + } } if err := wk.HandleDDLJobTable(d.ddlCtx, job); err != nil { logutil.BgLogger().Info("[ddl] handle ddl job failed", zap.Error(err), zap.String("job", job.String())) diff --git a/ddl/mock.go b/ddl/mock.go index 1741df211551c..2bea1349f1f43 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -91,11 +91,6 @@ func (s *MockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version return nil } -// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface. -func (s *MockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) { - return 0, nil -} - // OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface. func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error { ticker := time.NewTicker(mockCheckVersInterval) diff --git a/ddl/syncer/syncer.go b/ddl/syncer/syncer.go index fc7337624fbe5..e8a92737ef23b 100644 --- a/ddl/syncer/syncer.go +++ b/ddl/syncer/syncer.go @@ -67,8 +67,6 @@ type SchemaSyncer interface { GlobalVersionCh() clientv3.WatchChan // WatchGlobalSchemaVer watches the global schema version. WatchGlobalSchemaVer(ctx context.Context) - // MustGetGlobalVersion gets the global version. The only reason it fails is that ctx is done. - MustGetGlobalVersion(ctx context.Context) (int64, error) // Done returns a channel that closes when the syncer is no longer being refreshed. Done() <-chan struct{} // Restart restarts the syncer when it's on longer being refreshed. @@ -236,47 +234,6 @@ func (s *schemaVersionSyncer) removeSelfVersionPath() error { return errors.Trace(err) } -// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface. -func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) { - startTime := time.Now() - var ( - err error - ver int - resp *clientv3.GetResponse - ) - failedCnt := 0 - intervalCnt := int(time.Second / util.KeyOpRetryInterval) - - defer func() { - metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - }() - for { - if err != nil { - if failedCnt%intervalCnt == 0 { - logutil.BgLogger().Info("[ddl] syncer get global version failed", zap.Error(err)) - } - time.Sleep(util.KeyOpRetryInterval) - failedCnt++ - } - - if util.IsContextDone(ctx) { - err = errors.Trace(ctx.Err()) - return 0, err - } - - resp, err = s.etcdCli.Get(ctx, util.DDLGlobalSchemaVersion) - if err != nil { - continue - } - if len(resp.Kvs) > 0 { - ver, err = strconv.Atoi(string(resp.Kvs[0].Value)) - if err == nil { - return int64(ver), nil - } - } - } -} - // OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface. func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error { startTime := time.Now() diff --git a/ddl/syncer/syncer_test.go b/ddl/syncer/syncer_test.go index 8d730142296ee..6ac6606642160 100644 --- a/ddl/syncer/syncer_test.go +++ b/ddl/syncer/syncer_test.go @@ -85,15 +85,6 @@ func TestSyncerSimple(t *testing.T) { key := util2.DDLAllSchemaVersions + "/" + d.OwnerManager().ID() checkRespKV(t, 1, key, InitialVersion, resp.Kvs...) - // for MustGetGlobalVersion function - globalVer, err := d.SchemaSyncer().MustGetGlobalVersion(ctx) - require.NoError(t, err) - require.Equal(t, InitialVersion, fmt.Sprintf("%d", globalVer)) - - childCtx, cancel := context.WithTimeout(ctx, minInterval) - defer cancel() - _, err = d.SchemaSyncer().MustGetGlobalVersion(childCtx) - require.True(t, isTimeoutError(err)) ic2 := infoschema.NewCache(2) ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) @@ -138,7 +129,7 @@ func TestSyncerSimple(t *testing.T) { require.Equal(t, "", checkErr) // for CheckAllVersions - childCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond) + childCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) require.Error(t, d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer)) cancel() diff --git a/metrics/ddl.go b/metrics/ddl.go index df417930ef0bf..7c1e4de5b011d 100644 --- a/metrics/ddl.go +++ b/metrics/ddl.go @@ -71,7 +71,6 @@ var ( }, []string{LblResult}) OwnerUpdateGlobalVersion = "update_global_version" - OwnerGetGlobalVersion = "get_global_version" OwnerCheckAllVersions = "check_all_versions" OwnerHandleSyncerHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{