Skip to content

Commit

Permalink
ddl fix pingcap#38669.
Browse files Browse the repository at this point in the history
The issue was that mysql.tidb_ddl_reorg table was updated by an
inner transaction after the outer transaction started,
which then made a commit conflict in the outer transaction,
when it deleted the same row.
  • Loading branch information
mjonss committed Oct 29, 2022
1 parent e2e3b5c commit eb69cab
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 8 deletions.
9 changes: 5 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,8 +1274,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return w.reformatErrors(err)
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
Expand Down Expand Up @@ -1359,8 +1359,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords))
warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords))
// Optimize for few warnings!
warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
warningsCountMap := make(map[errors.ErrorID]int64, 2)
for _, rowRecord := range rowRecords {
taskCtx.scanCount++

Expand Down
9 changes: 6 additions & 3 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4544,11 +4544,14 @@ func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
tk.MustExec("use " + schemaName)
tk.MustExec(`set sql_mode = default`)
tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`)
tk.MustExec(`insert into t values ("123456")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '123456'")
tk.MustExec(`insert into t values ("123456"),(" 654321")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '")
tk.MustExec(`set sql_mode = ''`)
tk.MustExec(`alter table t modify a varchar(5)`)
tk.MustQuery(`show warnings`).Check(testkit.Rows())
// Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

func TestAlterModifyColumnOnPartitionedTable(t *testing.T) {
Expand Down
34 changes: 33 additions & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
}

updateBackfillProgress(w, reorgInfo, tblInfo, 0)
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
// Do this is a separate transaction, since mysql.tidb_ddl_reorg may have been updated
// by the inner function and could result in commit conflicts.:122

if err1 := reorgInfo.deleteReorgMeta(w.sessPool); err1 != nil {
logutil.BgLogger().Warn("[ddl] run reorg job done, removeDDLReorgHandle failed", zap.Error(err1))
return errors.Trace(err1)
}
Expand Down Expand Up @@ -742,6 +745,35 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
return &info, nil
}

func (r *reorgInfo) deleteReorgMeta(pool *sessionPool) error {
if len(r.elements) == 0 {
return nil
}
se, err := pool.get()
if err != nil {
return errors.Trace(err)
}
defer pool.put(se)

sess := newSession(se)
err = sess.begin()
if err != nil {
return errors.Trace(err)
}
txn, err := sess.txn()
if err != nil {
sess.rollback()
return errors.Trace(err)
}
rh := newReorgHandler(meta.NewMeta(txn), sess, variable.EnableConcurrentDDL.Load())
err = rh.RemoveDDLReorgHandle(r.Job, r.elements)
err1 := sess.commit()
if err == nil {
err = err1
}
return errors.Trace(err)
}

func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err error) {
if startKey == nil && r.EndKey == nil {
return nil
Expand Down
3 changes: 3 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ func (job *Job) String() string {
rowCount := job.GetRowCount()
ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
if job.ReorgMeta != nil {
ret += fmt.Sprintf(", UniqueWarnings:%d", len(job.ReorgMeta.Warnings))
}
if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible)
}
Expand Down

0 comments on commit eb69cab

Please sign in to comment.