From 491cd00026f5145377bd89d3ddb2d739465da429 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Fri, 19 Mar 2021 10:17:34 +0800 Subject: [PATCH] cherry pick #23364 to release-5.0 Signed-off-by: ti-srebot --- ddl/column.go | 11 +++++++++-- ddl/ddl.go | 11 +++++++++++ ddl/reorg.go | 9 +-------- store/tikv/2pc.go | 4 +++- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 8932957593a6d..85a9b0d92f8a6 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -825,7 +825,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in } if !needChangeColumnData(oldCol, jobParam.newCol) { - return w.doModifyColumn(t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos) + return w.doModifyColumn(d, t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos) } if jobParam.changingCol == nil { @@ -1384,11 +1384,18 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind // doModifyColumn updates the column information and reorders all columns. It does not support modifying column data. func (w *worker) doModifyColumn( - t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo, + d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo, newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) { // Column from null to not null. if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) { noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag) + + // lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly. + // We need to check after the flag is set + if d.lease > 0 && !noPreventNullFlag { + delayForAsyncCommit() + } + // Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values. err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, newCol.Name, oldCol.Tp != newCol.Tp) if err != nil { diff --git a/ddl/ddl.go b/ddl/ddl.go index 50eafa62e160b..ad41c3b53abac 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -668,6 +668,17 @@ type RecoverInfo struct { CurAutoRandID int64 } +// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes. +// It should be called before any DDL that could break data consistency. +// This provides a safe window for async commit and 1PC to commit with an old schema. +func delayForAsyncCommit() { + cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit + duration := cfg.SafeWindow + cfg.AllowedClockDrift + logutil.BgLogger().Info("sleep before DDL finishes to make async commit and 1PC safe", + zap.Duration("duration", duration)) + time.Sleep(duration) +} + var ( // RunInGoTest is used to identify whether ddl in running in the test. RunInGoTest bool diff --git a/ddl/reorg.go b/ddl/reorg.go index 2318dd6860081..1487ad6df334f 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -158,15 +157,9 @@ func (rc *reorgCtx) clean() { } func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { - // Sleep for reorgDelay before doing reorganization. - // This provides a safe window for async commit and 1PC to commit with an old schema. // lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly. if lease > 0 { - cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit - reorgDelay := cfg.SafeWindow + cfg.AllowedClockDrift - logutil.BgLogger().Info("sleep before reorganization to make async commit safe", - zap.Duration("duration", reorgDelay)) - time.Sleep(reorgDelay) + delayForAsyncCommit() } job := reorgInfo.Job diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index d95ce289c0b7a..02ecbd0239396 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1001,7 +1001,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } } - failpoint.Inject("beforePrewrite", nil) + if c.sessionID > 0 { + failpoint.Inject("beforePrewrite", nil) + } c.prewriteStarted = true var binlogChan <-chan BinlogWriteResult