Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. #38738

Merged
merged 76 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
e2e3b5c
Added test case
mjonss Oct 27, 2022
eb69cab
ddl fix #38669.
mjonss Oct 29, 2022
841b5fa
Fixed typo in comment
mjonss Oct 30, 2022
51174b0
Added test case for #24427
mjonss Nov 9, 2022
cbd0981
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Dec 25, 2022
17c28f3
Disabled tests for CI testing
mjonss Dec 25, 2022
65c84d9
Revert "Disabled tests for CI testing"
mjonss Dec 25, 2022
f432276
Revert "Revert "Disabled tests for CI testing""
mjonss Dec 25, 2022
31a8165
removed test skips
mjonss Dec 25, 2022
93ec6ce
Clean up the tidb_ddl_reorg entry after DDL is completed
mjonss Dec 26, 2022
f392e91
Use a cleanup job afterwards instead.
mjonss Dec 26, 2022
270a640
Fixed test
mjonss Dec 27, 2022
c56449d
Moved cleanup before asyncNotify
mjonss Dec 27, 2022
f8b2d62
More detailed test failure log
mjonss Dec 27, 2022
4de5504
Merge branch 'master' into trunc-warn-38669
mjonss Dec 27, 2022
2015061
Refined test error message
mjonss Dec 27, 2022
d673abc
Merge branch 'master' into trunc-warn-38669
mjonss Dec 28, 2022
f9ed823
Injecting timoeut to get stack traces from CI
mjonss Dec 28, 2022
9672513
Updated Debug Dump on timeout
mjonss Dec 28, 2022
4f667f9
Delete mulitple entries in tidb_ddl_reorg if needed
mjonss Dec 28, 2022
7908ee8
Linting
mjonss Dec 28, 2022
88a891e
Linting
mjonss Dec 28, 2022
9f9a4ca
Added CI debug logs
mjonss Dec 28, 2022
b7fda44
Linting + CI debugs
mjonss Dec 28, 2022
64f2925
fixed CI debug
mjonss Dec 28, 2022
a713d57
Try to cleanup also if job.State == synced
mjonss Dec 28, 2022
d968d4e
check for non-error of runErr instead of error...
mjonss Dec 29, 2022
6a0e0c3
Use a new session, instead of reusing worker.sess
mjonss Dec 29, 2022
e13c332
Also handle case when job == nil
mjonss Dec 29, 2022
fe65fa9
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Dec 29, 2022
5614b30
Removed CI debug logs
mjonss Dec 29, 2022
ab0087a
Misssed change session from w.sess to newly created sess
mjonss Dec 29, 2022
6186476
Improved TestConcurrentDDLSwitch and added CI debug logs
mjonss Dec 29, 2022
7257bd0
Always cleaning up all orphan mysql.tidb_ddl_reorg entries
mjonss Dec 29, 2022
6eb65d3
linting
mjonss Dec 29, 2022
143f889
Also cleanup if job is nil
mjonss Dec 29, 2022
3f6dcc5
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Dec 29, 2022
5473f2f
Updated TestModifyColumnReorgInfo + CI debug logs
mjonss Dec 29, 2022
250717d
more CI debug
mjonss Dec 29, 2022
641f89b
refactored the cleanupDDLReorgHandle code
mjonss Dec 29, 2022
eda7cc9
Added missing cleanup in handleDDLJobQueue
mjonss Dec 29, 2022
0598ed7
Removed debug panic
mjonss Dec 29, 2022
13c3d56
Code cleanup
mjonss Dec 29, 2022
54e0b24
Test updates
mjonss Dec 29, 2022
1d55510
Debug cleanup
mjonss Dec 29, 2022
cf478ea
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Dec 30, 2022
ee44b65
Cleaned up test after removal of old non-concurrent DDL code merge
mjonss Dec 30, 2022
0ed2f47
Linting
mjonss Dec 30, 2022
8cee684
always wrap changes to tidb_ddl_reorg in an own transaction
mjonss Jan 5, 2023
0164c1a
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Jan 5, 2023
d8483ab
Minimum fix
mjonss Jan 9, 2023
d05b1af
Always update reorg meta, not only on error
mjonss Jan 9, 2023
10cddb7
Issue is here :)
mjonss Jan 9, 2023
751a54a
Fixed newReorgHandler
mjonss Jan 9, 2023
83894b3
Wrapped more tidb_ddl_reorg changes into separate transactions
mjonss Jan 9, 2023
d0340d8
linting
mjonss Jan 9, 2023
7f18011
Removed updateDDLReorgStartHandle
mjonss Jan 9, 2023
0db4bb3
cleanups
mjonss Jan 9, 2023
5153f09
Made runInTxn a method on *session, instead of normal function
mjonss Jan 9, 2023
86bc31a
Update test
mjonss Jan 9, 2023
445d84c
Final touches
mjonss Jan 10, 2023
79e52b9
Merge branch 'fix-tidb_ddl_reorg' into trunc-warn-38669
mjonss Jan 10, 2023
b0b5084
Removed duplicate test
mjonss Jan 10, 2023
c35d853
CleanupDDLReorgHandles should only be called from HandleJobDone.
mjonss Jan 10, 2023
f1543d5
Variable rename
mjonss Jan 10, 2023
bfb5f52
Renamed 'delete' variabel name
mjonss Jan 10, 2023
0ec2512
Merge branch 'master' into trunc-warn-38669
mjonss Jan 10, 2023
78281b3
Updated test
mjonss Jan 10, 2023
7214e31
small revert
mjonss Jan 10, 2023
f9983e9
Merge remote-tracking branch 'pingcap/master' into trunc-warn-38669
mjonss Jan 11, 2023
51bacd3
Removed timeout debugging code
mjonss Jan 11, 2023
389ad4d
Simplified the cleanup to only start a new txn and not a new session
mjonss Jan 11, 2023
f04df45
Reverted the change of GetDDLInfo
mjonss Jan 11, 2023
076be10
Merge branch 'master' into trunc-warn-38669
ti-chi-bot Jan 11, 2023
022d951
Merge branch 'master' into trunc-warn-38669
ti-chi-bot Jan 11, 2023
43b6946
Merge branch 'master' into trunc-warn-38669
ti-chi-bot Jan 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
// workers to do this in the DDL owner node.
//
// DDL owner thread
// DDL owner thread (also see comments before runReorgJob func)
// ^
// | (reorgCtx.doneCh)
// |
Expand Down Expand Up @@ -583,9 +583,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
err = dc.isReorgRunnable(reorgInfo.Job.ID)
}

// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the updateDDLReorgStartHandle from runReorgJob on timeout and added it here instead.
It may change the number of transactions, but I think it is better to always save the state here, right after each batch is done (also still ignoring the error).


if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",

Expand Down Expand Up @@ -614,7 +615,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("take time", elapsedTime.String()))
zap.String("take time", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
return nil
}

Expand Down Expand Up @@ -1320,15 +1322,15 @@ func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte)
}

// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table.
func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJob) error {
sess, ok := sessCtx.(*session)
func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error {
s, ok := sctx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", sessCtx)
return errors.Errorf("sess ctx:%#v convert session failed", sctx)
}

return runInTxn(sess, func(se *session) error {
return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job")
if err != nil {
return errors.Trace(err)
Expand All @@ -1342,13 +1344,13 @@ func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJ
return errors.Trace(err)
}
startTS := txn.StartTS()
err = RemoveBackfillJob(sess, true, bJobs[0])
err = RemoveBackfillJob(se, true, bJobs[0])
if err == nil {
for _, bj := range bJobs {
bj.State = model.JobStateCancelled
bj.FinishTS = startTS
}
err = AddBackfillHistoryJob(sess, bJobs)
err = AddBackfillHistoryJob(se, bJobs)
}
logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs)))
return errors.Trace(err)
Expand Down
17 changes: 12 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,13 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess)
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = errors.Trace(err1)
return
}
defer w.sessPool.put(sctx)
rh := newReorgHandler(newSession(sctx))
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1291,8 +1297,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return w.reformatErrors(err)
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since GetWarnings takes a mutex and copies the data, it is better to only do it once instead of up to three times.

if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
Expand Down Expand Up @@ -1376,8 +1382,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords))
warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords))
// Optimize for few warnings!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to avoid reserving too much memory that may not be needed.
On the other hand it will be a single allocation instead of multiple ones when resizing the maps.

warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
warningsCountMap := make(map[errors.ErrorID]int64, 2)
for _, rowRecord := range rowRecords {
taskCtx.scanCount++

Expand Down
15 changes: 15 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2421,3 +2421,18 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustExec("admin check table t")
}

func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}
19 changes: 19 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4528,6 +4528,25 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) {
` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`))
}

func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
t.Skip("waiting for supporting Modify Partition Column again")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "truncWarn"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`set sql_mode = default`)
tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`)
tk.MustExec(`insert into t values ("123456"),(" 654321")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '")
tk.MustExec(`set sql_mode = ''`)
tk.MustExec(`alter table t modify a varchar(5)`)
// Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
5 changes: 1 addition & 4 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,7 @@ func TestAddExpressionIndexRollback(t *testing.T) {
// Check whether the reorg information is cleaned up.
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err))
require.Nil(t, element)
require.Nil(t, start)
Expand Down
15 changes: 14 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) {
return info, nil
}

_, info.ReorgHandle, _, _, err = newReorgHandler(t, sess).GetDDLReorgHandle(reorgJob)
_, info.ReorgHandle, _, _, err = newReorgHandler(sess).GetDDLReorgHandle(reorgJob)
if err != nil {
if meta.ErrDDLReorgElementNotExist.Equal(err) {
return info, nil
Expand Down Expand Up @@ -1584,6 +1584,19 @@ func (s *session) session() sessionctx.Context {
return s.Context
}

func (s *session) runInTxn(f func(*session) error) (err error) {
err = s.begin()
if err != nil {
return err
}
err = f(s)
if err != nil {
s.rollback()
return
}
return errors.Trace(s.commit())
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
iterator, err := GetLastHistoryDDLJobsIterator(m)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
if err != nil {
return err
}
CleanupDDLReorgHandles(job, w.sess)
asyncNotify(d.ddlJobDoneCh)
return nil
}
Expand Down
30 changes: 15 additions & 15 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
rh := newReorgHandler(t, w.sess)
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = err1
return
}
defer w.sessPool.put(sctx)
rh := newReorgHandler(newSession(sctx))
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1274,13 +1280,10 @@ func (w *baseIndexWorker) String() string {
}

func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
sess, ok := w.backfillCtx.sessCtx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx)
}
s := newSession(w.backfillCtx.sessCtx)

return runInTxn(sess, func(se *session) error {
jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d",
return s.runInTxn(func(se *session) error {
jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d",
bfJob.JobID, bfJob.EleID, bfJob.EleKey, bfJob.ID), "update_backfill_task")
if err != nil {
return err
Expand All @@ -1297,26 +1300,23 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
return err
}
bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease)
return updateBackfillJob(sess, BackfillTable, bfJob, "update_backfill_task")
return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task")
})
}

func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error {
sess, ok := w.backfillCtx.sessCtx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx)
}
return runInTxn(sess, func(se *session) error {
s := newSession(w.backfillCtx.sessCtx)
return s.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
}
bfJob.FinishTS = txn.StartTS()
err = RemoveBackfillJob(sess, false, bfJob)
err = RemoveBackfillJob(se, false, bfJob)
if err != nil {
return err
}
return AddBackfillHistoryJob(sess, []*BackfillJob{bfJob})
return AddBackfillHistoryJob(se, []*BackfillJob{bfJob})
})
}

Expand Down
Loading