Skip to content

Commit

Permalink
ddl: add lease not found and deadline exceed to retryable errors (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Oct 16, 2024
1 parent 3df0f2e commit 1352db6
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 24 deletions.
13 changes: 0 additions & 13 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -223,17 +221,6 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func isRetryableError(err error) bool {
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry Unknown err.
return false
}

func (*backfillDistExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err) || isRetryableError(err)
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (w *worker) checkVectorIndexProcessOnTiFlash(jobCtx *jobContext, job *model
if dbterror.ErrWaitReorgTimeout.Equal(err) {
return false, ver, nil
}
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run add vector index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err)
}
Expand Down Expand Up @@ -981,7 +981,7 @@ SwitchIndexState:
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(job)
if err != nil {
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
job.State = model.JobStateCancelled
}
return ver, err
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job,
if kv.ErrKeyExists.Equal(err) {
logutil.DDLLogger().Warn("import index duplicate key, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
} else if !errorIsRetryable(err, job) {
} else if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run reorg job failed, convert job to rollback",
zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
Expand All @@ -1274,10 +1274,20 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job,
return done, ver, nil
}

func errorIsRetryable(err error, job *model.Job) bool {
if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() {
func isRetryableJobError(err error, jobErrCnt int64) bool {
if jobErrCnt+1 >= variable.GetDDLErrorCountLimit() {
return false
}
return isRetryableError(err)
}

func isRetryableError(err error) bool {
errMsg := err.Error()
for _, m := range dbterror.ReorgRetryableErrMsgs {
if strings.Contains(errMsg, m) {
return true
}
}
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
Expand Down Expand Up @@ -1347,7 +1357,7 @@ func runReorgJobAndHandleErr(
}
// TODO(tangenta): get duplicate column and match index.
err = ingest.TryConvertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta())
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run add index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (w *worker) transitOneJobStep(
jobCtx.addUnSynced(job.ID)

// If error is non-retryable, we can ignore the sleep.
if runJobErr != nil && errorIsRetryable(runJobErr, job) {
if runJobErr != nil && isRetryableJobError(runJobErr, job.ErrorCount) {
jobCtx.logger.Info("run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
Expand Down
3 changes: 3 additions & 0 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ func AcquireDistributedLock(
}
return false, nil
})
failpoint.Inject("mockAcquireDistLockFailed", func() {
err = errors.Errorf("requested lease not found")
})
if err != nil {
err1 := se.Close()
if err1 != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ var (
ErrWarnGlobalIndexNeedManuallyAnalyze = ClassDDL.NewStd(mysql.ErrWarnGlobalIndexNeedManuallyAnalyze)
)

// ReorgRetryableErrCodes is the error codes that are retryable for reorganization.
// ReorgRetryableErrCodes are the error codes that are retryable for reorganization.
var ReorgRetryableErrCodes = map[uint16]struct{}{
mysql.ErrPDServerTimeout: {},
mysql.ErrTiKVServerTimeout: {},
Expand All @@ -526,3 +526,9 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{
// Temporary network partitioning may cause pk commit failure.
uint16(terror.CodeResultUndetermined): {},
}

// ReorgRetryableErrMsgs are the error messages that are retryable for reorganization.
var ReorgRetryableErrMsgs = []string{
"context deadline exceeded",
"requested lease not found",
}
14 changes: 14 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,17 @@ func TestAddUKErrorMessage(t *testing.T) {
err := tk.ExecToErr("alter table t add unique index uk(b);")
require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'")
}

func TestAddIndexDistLockAcquireFailed(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task = on;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
tk.MustExec("alter table t add index idx(b);")
}
6 changes: 3 additions & 3 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,14 @@ func TestAddIndexIngestFailures(t *testing.T) {
tk.MustExec("insert into t values (1, 1, 1);")

// Test precheck failed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "1*return"))
tk.MustGetErrMsg("alter table t add index idx(b);", "[ddl:8256]Check ingest environment failed: mock error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed"))

tk.MustExec(`set global tidb_enable_dist_task=on;`)
// Test reset engine failed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "return"))
tk.MustGetErrMsg("alter table t add index idx(b);", "[0]mock reset engine failed")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "1*return"))
tk.MustExec("alter table t add index idx(b);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed"))
tk.MustExec(`set global tidb_enable_dist_task=off;`)
}
Expand Down

0 comments on commit 1352db6

Please sign in to comment.