From 408a46654d6e189f754c0d9052b5976bc0b3aca5 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 9 Jan 2023 15:50:22 +0800 Subject: [PATCH 1/7] session: fix deadlock when init domain failed (#40409) close pingcap/tidb#40408 --- domain/domain.go | 8 ++++++-- domain/domain_test.go | 6 +++--- session/tidb.go | 12 ++++++------ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index b3ed976bf7e03..d01b900cdf444 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -898,7 +898,7 @@ func (do *Domain) Close() { const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout // NewDomain creates a new domain. Should not create multiple domains for the same store. -func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, onClose func()) *Domain { +func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain { capacity := 200 // capacity of the sysSessionPool size do := &Domain{ store: store, @@ -909,7 +909,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}}, - onClose: onClose, expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), mdlCheckTableInfo: &mdlCheckTableInfo{ mu: sync.Mutex{}, @@ -1082,6 +1081,11 @@ func (do *Domain) Init( return nil } +// SetOnClose used to set do.onClose func. +func (do *Domain) SetOnClose(onClose func()) { + do.onClose = onClose +} + func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if pdClient == nil || do.etcdClient == nil { diff --git a/domain/domain_test.go b/domain/domain_test.go index c117ac2244b2e..bd9287fe730ec 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -68,7 +68,7 @@ func TestInfo(t *testing.T) { Storage: s, pdAddrs: []string{cluster.Members[0].GRPCURL()}} ddlLease := 80 * time.Millisecond - dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory) defer func() { dom.Close() err := s.Close() @@ -171,7 +171,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) { require.NoError(t, err) ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory) metrics.PanicCounter.Reset() // Since the stats lease is 0 now, so create a new ticker will panic. @@ -238,7 +238,7 @@ func TestClosestReplicaReadChecker(t *testing.T) { require.NoError(t, err) ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil) + dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory) defer func() { dom.Close() require.Nil(t, store.Close()) diff --git a/session/tidb.go b/session/tidb.go index fd4411a18518c..113638958ecab 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -42,11 +42,12 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" ) type domainMap struct { - mu sync.Mutex + mu syncutil.Mutex domains map[string]*domain.Domain } @@ -81,10 +82,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { zap.Stringer("index usage sync lease", idxUsageSyncLease)) factory := createSessionFunc(store) sysFactory := createSessionWithDomainFunc(store) - onClose := func() { - dm.Delete(store) - } - d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) + d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory) var ddlInjector func(ddl.DDL) *schematracker.Checker if injector, ok := store.(schematracker.StorageDDLInjector); ok { @@ -102,8 +100,10 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { if err != nil { return nil, err } - dm.domains[key] = d + d.SetOnClose(func() { + dm.Delete(store) + }) return } From 83ec4b03f35830f9d6f3054b6dbf5ee2b411251c Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Mon, 9 Jan 2023 16:30:22 +0800 Subject: [PATCH 2/7] executor: support insert ignore/duplicate replace into with unique multi-valued index (#40369) close pingcap/tidb#40207 --- executor/batch_checker.go | 44 ++++++++++++++++++++------------------- executor/insert_test.go | 28 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index c1eb1fda0d8f5..838c6af7bace0 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -190,28 +190,30 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D } // Pass handle = 0 to GenIndexKey, // due to we only care about distinct key. - key, distinct, err1 := v.GenIndexKey(ctx.GetSessionVars().StmtCtx, - colVals, kv.IntHandle(0), nil) - if err1 != nil { - return nil, err1 - } - // Skip the non-distinct keys. - if !distinct { - continue - } - // If index is used ingest ways, then we should check key from temp index. - if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { - _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) - } - colValStr, err1 := formatDataForDupError(colVals) - if err1 != nil { - return nil, err1 + iter := v.GenIndexKVIter(ctx.GetSessionVars().StmtCtx, colVals, kv.IntHandle(0), nil) + for iter.Valid() { + key, _, distinct, err1 := iter.Next(nil) + if err1 != nil { + return nil, err1 + } + // Skip the non-distinct keys. + if !distinct { + continue + } + // If index is used ingest ways, then we should check key from temp index. + if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { + _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) + } + colValStr, err1 := formatDataForDupError(colVals) + if err1 != nil { + return nil, err1 + } + uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{ + newKey: key, + dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())), + commonHandle: t.Meta().IsCommonHandle, + }) } - uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{ - newKey: key, - dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())), - commonHandle: t.Meta().IsCommonHandle, - }) } row = row[:len(row)-extraColumns] result = append(result, toBeCheckedRow{ diff --git a/executor/insert_test.go b/executor/insert_test.go index b55c3a63765e3..acd8d6736b219 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -419,6 +419,34 @@ func TestInsertValueForCastDecimalField(t *testing.T) { tk.MustQuery(`select cast(a as decimal) from t1;`).Check(testkit.Rows(`9999999999`)) } +func TestInsertForMultiValuedIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists t1;`) + tk.MustExec(`create table t1(a json, b int, unique index idx((cast(a as signed array))));`) + tk.MustExec(`insert into t1 values ('[1,11]', 1);`) + tk.MustExec(`insert into t1 values ('[2, 22]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustGetErrMsg(`insert into t1 values ('[2, 222]', 2);`, "[kv:1062]Duplicate entry '2' for key 't1.idx'") + tk.MustExec(`replace into t1 values ('[1, 10]', 10)`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[2, 22] 2`, `[1, 10] 10`)) + tk.MustExec(`replace into t1 values ('[1, 2]', 1)`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 2] 1`)) + tk.MustExec(`replace into t1 values ('[1, 11]', 1)`) + tk.MustExec(`insert into t1 values ('[2, 22]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert ignore into t1 values ('[1]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert ignore into t1 values ('[1, 2]', 2);`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 2`)) + tk.MustExec(`insert into t1 values ('[2]', 2) on duplicate key update b = 10;`) + tk.MustQuery(`select * from t1;`).Check(testkit.Rows(`[1, 11] 1`, `[2, 22] 10`)) + tk.MustGetErrMsg(`insert into t1 values ('[2, 1]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") + tk.MustGetErrMsg(`insert into t1 values ('[1,2]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") + tk.MustGetErrMsg(`insert into t1 values ('[11, 22]', 2) on duplicate key update a = '[1,2]';`, "[kv:1062]Duplicate entry '[1, 2]' for key 't1.idx'") +} + func TestInsertDateTimeWithTimeZone(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From 199bb382a19dd41caad2bba49703290baec261d2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 9 Jan 2023 16:54:23 +0800 Subject: [PATCH 3/7] br: support reset_tiflash after ebs restoration (#40124) close pingcap/tidb#40208 --- br/pkg/restore/client.go | 92 +++++++++++++++++++ br/pkg/restore/tiflashrec/tiflash_recorder.go | 52 ++++++++++- .../tiflashrec/tiflash_recorder_test.go | 29 ++++++ br/pkg/task/restore_data.go | 4 + 4 files changed, 175 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index d7811574915f2..6f91a3b4deffc 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2646,6 +2646,74 @@ func (rc *Client) SetWithSysTable(withSysTable bool) { rc.withSysTable = withSysTable } +func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error { + dom, err := g.GetDomain(storage) + if err != nil { + return errors.Trace(err) + } + info := dom.InfoSchema() + allSchema := info.AllSchemas() + recorder := tiflashrec.New() + + expectTiFlashStoreCount := uint64(0) + needTiFlash := false + for _, s := range allSchema { + for _, t := range s.Tables { + if t.TiFlashReplica != nil { + expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count) + recorder.AddTable(t.ID, *t.TiFlashReplica) + needTiFlash = true + } + } + } + if !needTiFlash { + log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica") + return nil + } + // we wait for ten minutes to wait tiflash starts. + // since tiflash only starts when set unmark recovery mode finished. + timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + err = utils.WithRetry(timeoutCtx, func() error { + tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx) + log.Info("get tiflash store count for resetting TiFlash Replica", + zap.Uint64("count", tiFlashStoreCount)) + if err != nil { + return errors.Trace(err) + } + if tiFlashStoreCount < expectTiFlashStoreCount { + log.Info("still waiting for enough tiflash store start", + zap.Uint64("expect", expectTiFlashStoreCount), + zap.Uint64("actual", tiFlashStoreCount), + ) + return errors.New("tiflash store count is less than expected") + } + return nil + }, &waitTiFlashBackoffer{ + Attempts: 30, + BaseBackoff: 4 * time.Second, + }) + if err != nil { + return err + } + + sqls := recorder.GenerateResetAlterTableDDLs(info) + log.Info("Generating SQLs for resetting tiflash replica", + zap.Strings("sqls", sqls)) + + return g.UseOneShotSession(storage, false, func(se glue.Session) error { + for _, sql := range sqls { + if errExec := se.ExecuteInternal(ctx, sql); errExec != nil { + logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.", + logutil.ShortError(errExec), + zap.String("sql", sql), + ) + } + } + return nil + }) +} + // MockClient create a fake client used to test. func MockClient(dbs map[string]*utils.Database) *Client { return &Client{databases: dbs} @@ -2721,3 +2789,27 @@ func CheckNewCollationEnable( log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled)) return nil } + +type waitTiFlashBackoffer struct { + Attempts int + BaseBackoff time.Duration +} + +// NextBackoff returns a duration to wait before retrying again +func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { + bo := b.BaseBackoff + b.Attempts-- + if b.Attempts == 0 { + return 0 + } + b.BaseBackoff *= 2 + if b.BaseBackoff > 32*time.Second { + b.BaseBackoff = 32 * time.Second + } + return bo +} + +// Attempt returns the remain attempt times +func (b *waitTiFlashBackoffer) Attempt() int { + return b.Attempts +} diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index 31dde982a7b69..84707f05e1f1b 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -79,6 +79,46 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) { } } +func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string { + items := make([]string, 0, len(r.items)) + r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) { + table, ok := info.TableByID(id) + if !ok { + log.Warn("Table do not exist, skipping", zap.Int64("id", id)) + return + } + schema, ok := info.SchemaByTable(table.Meta()) + if !ok { + log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name)) + return + } + // Currently, we didn't backup tiflash cluster volume during volume snapshot backup, + // But the table has replica info after volume restoration. + // We should reset it to 0, then set it back. otherwise, it will return error when alter tiflash replica. + altTableSpec, err := alterTableSpecOf(replica, true) + if err != nil { + log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) + return + } + items = append(items, fmt.Sprintf( + "ALTER TABLE %s %s", + utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + altTableSpec), + ) + altTableSpec, err = alterTableSpecOf(replica, false) + if err != nil { + log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) + return + } + items = append(items, fmt.Sprintf( + "ALTER TABLE %s %s", + utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + altTableSpec), + ) + }) + return items +} + func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string { items := make([]string, 0, len(r.items)) r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) { @@ -92,7 +132,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name)) return } - altTableSpec, err := alterTableSpecOf(replica) + altTableSpec, err := alterTableSpecOf(replica, false) if err != nil { log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica)) return @@ -106,7 +146,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s return items } -func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) { +func alterTableSpecOf(replica model.TiFlashReplicaInfo, reset bool) (string, error) { spec := &ast.AlterTableSpec{ Tp: ast.AlterTableSetTiFlashReplica, TiFlashReplica: &ast.TiFlashReplicaSpec{ @@ -114,6 +154,14 @@ func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) { Labels: replica.LocationLabels, }, } + if reset { + spec = &ast.AlterTableSpec{ + Tp: ast.AlterTableSetTiFlashReplica, + TiFlashReplica: &ast.TiFlashReplicaSpec{ + Count: 0, + }, + } + } buf := bytes.NewBuffer(make([]byte, 0, 32)) restoreCx := format.NewRestoreCtx( diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder_test.go b/br/pkg/restore/tiflashrec/tiflash_recorder_test.go index b01272caeddc5..f7316a1ed3133 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder_test.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder_test.go @@ -170,3 +170,32 @@ func TestGenSql(t *testing.T) { "ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'", }) } + +func TestGenResetSql(t *testing.T) { + tInfo := func(id int, name string) *model.TableInfo { + return &model.TableInfo{ + ID: int64(id), + Name: model.NewCIStr(name), + } + } + fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{ + tInfo(1, "fruits"), + tInfo(2, "whisper"), + }) + rec := tiflashrec.New() + rec.AddTable(1, model.TiFlashReplicaInfo{ + Count: 1, + }) + rec.AddTable(2, model.TiFlashReplicaInfo{ + Count: 2, + LocationLabels: []string{"climate"}, + }) + + sqls := rec.GenerateResetAlterTableDDLs(fakeInfo) + require.ElementsMatch(t, sqls, []string{ + "ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 0", + "ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'", + "ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 0", + "ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1", + }) +} diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index f8e286dd0e72b..fc82d011abb0d 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -154,6 +154,10 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto //TODO: restore volume type into origin type //ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta + // since we cannot reset tiflash automaticlly. so we should start it manually + if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil { + return errors.Trace(err) + } progress.Close() summary.CollectDuration("restore duration", time.Since(startAll)) summary.SetSuccessStatus(true) From 7dbf1a5033a8af125a082bdc93743a8ccb67bf32 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Mon, 9 Jan 2023 05:12:22 -0500 Subject: [PATCH 4/7] ttl: add ttl goroutine exit log (#40416) close pingcap/tidb#40415 --- domain/domain.go | 4 ++++ ttl/ttlworker/del.go | 5 ++++- ttl/ttlworker/job_manager.go | 4 +++- ttl/ttlworker/scan.go | 5 ++++- ttl/ttlworker/worker.go | 5 +++++ 5 files changed, 20 insertions(+), 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index d01b900cdf444..06fd9ff4a62a9 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -2479,6 +2479,10 @@ func (do *Domain) serverIDKeeper() { // StartTTLJobManager creates and starts the ttl job manager func (do *Domain) StartTTLJobManager() { do.wg.Run(func() { + defer func() { + logutil.BgLogger().Info("ttlJobManager exited.") + }() + ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store) do.ttlJobManager = ttlJobManager ttlJobManager.Start() diff --git a/ttl/ttlworker/del.go b/ttl/ttlworker/del.go index 8f66fb7fad246..5236bcc2275e6 100644 --- a/ttl/ttlworker/del.go +++ b/ttl/ttlworker/del.go @@ -255,7 +255,10 @@ func newDeleteWorker(delCh <-chan *ttlDeleteTask, sessPool sessionPool) *ttlDele func (w *ttlDeleteWorker) loop() error { tracer := metrics.NewDeleteWorkerPhaseTracer() - defer tracer.EndPhase() + defer func() { + tracer.EndPhase() + logutil.BgLogger().Info("ttlDeleteWorker loop exited.") + }() tracer.EnterPhase(metrics.PhaseOther) se, err := getSession(w.sessionPool) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index bb52457e484e5..f0d88a6e5668d 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -117,6 +117,7 @@ func (m *JobManager) jobLoop() error { defer func() { err = multierr.Combine(err, multierr.Combine(m.resizeScanWorkers(0), m.resizeDelWorkers(0))) se.Close() + logutil.Logger(m.ctx).Info("ttlJobManager loop exited.") }() scheduleTicker := time.Tick(jobManagerLoopTickerInterval) @@ -247,7 +248,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w } var errs error - ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second) + // don't use `m.ctx` here, because when shutdown the server, `m.ctx` has already been cancelled + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) for _, w := range workers[count:] { err := w.WaitStopped(ctx, 30*time.Second) if err != nil { diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 38a4fd544535d..48686ef87d2f3 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -284,7 +284,10 @@ func (w *ttlScanWorker) PollTaskResult() *ttlScanTaskExecResult { func (w *ttlScanWorker) loop() error { ctx := w.baseWorker.ctx tracer := metrics.NewScanWorkerPhaseTracer() - defer tracer.EndPhase() + defer func() { + tracer.EndPhase() + logutil.BgLogger().Info("ttlScanWorker loop exited.") + }() ticker := time.Tick(time.Second * 5) for w.Status() == workerStatusRunning { diff --git a/ttl/ttlworker/worker.go b/ttl/ttlworker/worker.go index 783384862cacf..68ea0d9a1b952 100644 --- a/ttl/ttlworker/worker.go +++ b/ttl/ttlworker/worker.go @@ -20,6 +20,8 @@ import ( "time" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) type workerStatus int @@ -122,6 +124,9 @@ func (w *baseWorker) Send() chan<- interface{} { func (w *baseWorker) loop() { var err error defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Info("ttl worker panic", zap.Any("recover", r)) + } w.Lock() w.toStopped(err) w.Unlock() From 0f838d9a677ac68f91382ddb041bcf3b34c809c9 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Mon, 9 Jan 2023 18:36:22 +0800 Subject: [PATCH 5/7] planner, executor: split the range for unsigned pk of partition table when limit is used (#40313) close pingcap/tidb#40309 --- executor/partition_table_test.go | 11 +++++++++++ planner/core/task.go | 3 ++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index b00b04f69620b..85b096c28ff0d 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -728,6 +728,17 @@ func TestOrderByAndLimit(t *testing.T) { } } +func TestOrderByOnUnsignedPk(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table tunsigned_hash(a bigint unsigned primary key) partition by hash(a) partitions 6") + tk.MustExec("insert into tunsigned_hash values(25), (9279808998424041135)") + tk.MustQuery("select min(a) from tunsigned_hash").Check(testkit.Rows("25")) + tk.MustQuery("select max(a) from tunsigned_hash").Check(testkit.Rows("9279808998424041135")) +} + func TestBatchGetandPointGetwithHashPartition(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/planner/core/task.go b/planner/core/task.go index d50daae83f21c..d68806c5ef1d8 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1072,7 +1072,6 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo if !propMatched { return nil, false } - idxScan.Desc = isDesc childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count @@ -1100,6 +1099,8 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo } } tblScan.Desc = isDesc + // SplitRangesAcrossInt64Boundary needs the KeepOrder flag. See that func and the struct tableResultHandler for more details. + tblScan.KeepOrder = true childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count stats := deriveLimitStats(childProfile, float64(newCount)) From 362defb112dbfeda99f5a814fc092b39ee8bf882 Mon Sep 17 00:00:00 2001 From: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Date: Mon, 9 Jan 2023 21:10:22 +0800 Subject: [PATCH 6/7] planner: add nil check when handling empty charset in `setUnionFlen` (#40429) close pingcap/tidb#40285 --- planner/core/integration_test.go | 8 ++++++++ planner/core/logical_plan_builder.go | 4 +++- planner/core/plan_cost_ver2_test.go | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 459d113d97f2c..0eb7bea667b9a 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -8219,3 +8219,11 @@ func TestAutoIncrementCheckWithCheckConstraint(t *testing.T) { KEY idx_autoinc_id (id) )`) } + +func TestIssue40285(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t(col1 enum('p5', '9a33x') NOT NULL DEFAULT 'p5',col2 tinyblob DEFAULT NULL) ENGINE = InnoDB DEFAULT CHARSET = latin1 COLLATE = latin1_bin;") + tk.MustQuery("(select last_value(col1) over () as r0 from t) union all (select col2 as r0 from t);") +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index df903670ab010..21c54056c7cd8 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -1576,7 +1576,9 @@ func (*PlanBuilder) setUnionFlen(resultTp *types.FieldType, cols []expression.Ex childTp := cols[i].GetType() childTpCharLen := 1 if isBinary { - childTpCharLen = charset.CharacterSetInfos[childTp.GetCharset()].Maxlen + if charsetInfo, ok := charset.CharacterSetInfos[childTp.GetCharset()]; ok { + childTpCharLen = charsetInfo.Maxlen + } } resultTp.SetFlen(mathutil.Max(resultTp.GetFlen(), childTpCharLen*childTp.GetFlen())) } diff --git a/planner/core/plan_cost_ver2_test.go b/planner/core/plan_cost_ver2_test.go index 38ee51738d81a..c56f41b22cf59 100644 --- a/planner/core/plan_cost_ver2_test.go +++ b/planner/core/plan_cost_ver2_test.go @@ -163,6 +163,7 @@ func TestCostModelVer2ScanRowSize(t *testing.T) { tk.MustExec(`create table t (pk int, a int, b int, c int, d int, primary key(pk), index ab(a, b), index abc(a, b, c))`) tk.MustExec("insert into t values (1, 1, 1, 1, 1)") tk.MustExec(`set @@tidb_cost_model_version=2`) + tk.MustExec("set global tidb_enable_collect_execution_info=1;") cases := []struct { query string From b912237ac78b66057040e7d7d45fb84df6f81548 Mon Sep 17 00:00:00 2001 From: Chengpeng Yan <41809508+Reminiscent@users.noreply.github.com> Date: Mon, 9 Jan 2023 21:42:22 +0800 Subject: [PATCH 7/7] planner: support keep_order and no_keep_order hint (#40089) ref pingcap/tidb#39964 --- parser/ast/dml.go | 6 ++ planner/core/find_best_task.go | 16 +++++ planner/core/integration_test.go | 37 ++++++++++ planner/core/logical_plan_builder.go | 51 ++++++-------- planner/core/planbuilder.go | 6 ++ .../core/testdata/integration_suite_in.json | 11 +++ .../core/testdata/integration_suite_out.json | 68 +++++++++++++++++++ planner/util/path.go | 6 +- 8 files changed, 168 insertions(+), 33 deletions(-) diff --git a/parser/ast/dml.go b/parser/ast/dml.go index 4e97ae8d95882..106480c67edfa 100644 --- a/parser/ast/dml.go +++ b/parser/ast/dml.go @@ -358,6 +358,8 @@ const ( HintUse IndexHintType = iota + 1 HintIgnore HintForce + HintKeepOrder + HintNoKeepOrder ) // IndexHintScope is the type for index hint for join, order by or group by. @@ -388,6 +390,10 @@ func (n *IndexHint) Restore(ctx *format.RestoreCtx) error { indexHintType = "IGNORE INDEX" case HintForce: indexHintType = "FORCE INDEX" + case HintKeepOrder: + indexHintType = "KEEP ORDER" + case HintNoKeepOrder: + indexHintType = "NO KEEP ORDER" default: // Prevent accidents return errors.New("IndexHintType has an error while matching") } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 37e58a6e09327..867541b97fb99 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1436,6 +1436,14 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, if !prop.IsSortItemEmpty() && !candidate.isMatchProp { return invalidTask, nil } + // If we need to keep order for the index scan, we should forbid the non-keep-order index scan when we try to generate the path. + if prop.IsSortItemEmpty() && candidate.path.ForceKeepOrder { + return invalidTask, nil + } + // If we don't need to keep order for the index scan, we should forbid the non-keep-order index scan when we try to generate the path. + if !prop.IsSortItemEmpty() && candidate.path.ForceNoKeepOrder { + return invalidTask, nil + } path := candidate.path is := ds.getOriginalPhysicalIndexScan(prop, path, candidate.isMatchProp, candidate.path.IsSingleScan) cop := &copTask{ @@ -1975,6 +1983,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid if !prop.IsSortItemEmpty() && !candidate.isMatchProp { return invalidTask, nil } + // If we need to keep order for the index scan, we should forbid the non-keep-order index scan when we try to generate the path. + if prop.IsSortItemEmpty() && candidate.path.ForceKeepOrder { + return invalidTask, nil + } + // If we don't need to keep order for the index scan, we should forbid the non-keep-order index scan when we try to generate the path. + if !prop.IsSortItemEmpty() && candidate.path.ForceNoKeepOrder { + return invalidTask, nil + } ts, _ := ds.getOriginalPhysicalTableScan(prop, candidate.path, candidate.isMatchProp) if ts.KeepOrder && ts.StoreType == kv.TiFlash && (ts.Desc || ds.SCtx().GetSessionVars().TiFlashFastScan) { // TiFlash fast mode(https://github.com/pingcap/tidb/pull/35851) does not keep order in TableScan diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 0eb7bea667b9a..7d97b4cfacbea 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -1324,6 +1324,43 @@ func TestReadFromStorageHint(t *testing.T) { } } +func TestKeepOrderHint(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("set tidb_cost_model_version=2") + tk.MustExec("drop table if exists t, t1, t2") + tk.MustExec("create table t(a int, b int, primary key(a));") + tk.MustExec("create table t1(a int, b int, index idx_a(a), index idx_b(b));") + + // If the optimizer can not generate the keep order plan, it will report error + err := tk.ExecToErr("explain select /*+ keep_order(t1, idx_a) */ * from t1 where a<10 limit 1;") + require.EqualError(t, err, "[planner:1815]Internal : Can't find a proper physical plan for this query") + + err = tk.ExecToErr("explain select /*+ keep_order(t, primary) */ * from t where a<10 limit 1;") + require.EqualError(t, err, "[planner:1815]Internal : Can't find a proper physical plan for this query") + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + integrationSuiteData := core.GetIntegrationSuiteData() + integrationSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) + } +} + func TestViewHint(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 21c54056c7cd8..702adcdcf5f3a 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -118,6 +118,10 @@ const ( HintIgnoreIndex = "ignore_index" // HintForceIndex make optimizer to use this index even if it thinks a table scan is more efficient. HintForceIndex = "force_index" + // HintKeepOrder is hint enforce using some indexes and keep the index's order. + HintKeepOrder = "keep_order" + // HintNoKeepOrder is hint enforce using some indexes and not keep the index's order. + HintNoKeepOrder = "no_keep_order" // HintAggToCop is hint enforce pushing aggregation to coprocessor. HintAggToCop = "agg_to_cop" // HintReadFromStorage is hint enforce some tables read from specific type of storage. @@ -3610,7 +3614,7 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev // Set warning for the hint that requires the table name. switch hint.HintName.L { case TiDBMergeJoin, HintSMJ, TiDBIndexNestedLoopJoin, HintINLJ, HintINLHJ, HintINLMJ, - TiDBHashJoin, HintHJ, HintUseIndex, HintIgnoreIndex, HintForceIndex, HintIndexMerge, HintLeading: + TiDBHashJoin, HintHJ, HintUseIndex, HintIgnoreIndex, HintForceIndex, HintKeepOrder, HintNoKeepOrder, HintIndexMerge, HintLeading: if len(hint.Tables) == 0 { b.pushHintWithoutTableWarning(hint) continue @@ -3646,40 +3650,23 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev aggHints.preferAggType |= preferStreamAgg case HintAggToCop: aggHints.preferAggToCop = true - case HintUseIndex: + case HintUseIndex, HintIgnoreIndex, HintForceIndex, HintKeepOrder, HintNoKeepOrder: dbName := hint.Tables[0].DBName if dbName.L == "" { dbName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB) } - indexHintList = append(indexHintList, indexHintInfo{ - dbName: dbName, - tblName: hint.Tables[0].TableName, - partitions: hint.Tables[0].PartitionList, - indexHint: &ast.IndexHint{ - IndexNames: hint.Indexes, - HintType: ast.HintUse, - HintScope: ast.HintForScan, - }, - }) - case HintIgnoreIndex: - dbName := hint.Tables[0].DBName - if dbName.L == "" { - dbName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB) - } - indexHintList = append(indexHintList, indexHintInfo{ - dbName: dbName, - tblName: hint.Tables[0].TableName, - partitions: hint.Tables[0].PartitionList, - indexHint: &ast.IndexHint{ - IndexNames: hint.Indexes, - HintType: ast.HintIgnore, - HintScope: ast.HintForScan, - }, - }) - case HintForceIndex: - dbName := hint.Tables[0].DBName - if dbName.L == "" { - dbName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB) + var hintType ast.IndexHintType + switch hint.HintName.L { + case HintUseIndex: + hintType = ast.HintUse + case HintIgnoreIndex: + hintType = ast.HintIgnore + case HintForceIndex: + hintType = ast.HintForce + case HintKeepOrder: + hintType = ast.HintKeepOrder + case HintNoKeepOrder: + hintType = ast.HintNoKeepOrder } indexHintList = append(indexHintList, indexHintInfo{ dbName: dbName, @@ -3687,7 +3674,7 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev partitions: hint.Tables[0].PartitionList, indexHint: &ast.IndexHint{ IndexNames: hint.Indexes, - HintType: ast.HintForce, + HintType: hintType, HintScope: ast.HintForScan, }, }) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index e41a523a7671b..649a60415b359 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1421,6 +1421,12 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i // our cost estimation is not reliable. hasUseOrForce = true path.Forced = true + if hint.HintType == ast.HintKeepOrder { + path.ForceKeepOrder = true + } + if hint.HintType == ast.HintNoKeepOrder { + path.ForceNoKeepOrder = true + } available = append(available, path) } } diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 0e668ce3df8ba..71123d8b85469 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -653,6 +653,17 @@ "desc format = 'brief' select /*+ read_from_storage(tiflash[t, ttt], tikv[tt]) */ * from ttt" ] }, + { + "name": "TestKeepOrderHint", + "cases": [ + "explain select /*+ keep_order(t1, idx_a) */ * from t1 where a<10 order by a limit 1;", + "explain select /*+ keep_order(t, primary) */ * from t where a<10 order by a limit 1;", + "explain select /*+ no_keep_order(t1, idx_a) */ * from t1 where a<10 order by a limit 1;", + "explain select /*+ no_keep_order(t, primary) */ * from t where a<10 order by a limit 1;", + "explain select /*+ no_keep_order(t1, idx_a) */ * from t1 where a<10 limit 1;", + "explain select /*+ no_keep_order(t, primary) */ * from t where a<10 limit 1;" + ] + }, { "name": "TestViewHint", "cases": [ diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 40d8402497569..67fd9d86ba858 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -4321,6 +4321,74 @@ } ] }, + { + "Name": "TestKeepOrderHint", + "Cases": [ + { + "SQL": "explain select /*+ keep_order(t1, idx_a) */ * from t1 where a<10 order by a limit 1;", + "Plan": [ + "Limit_12 1.00 root offset:0, count:1", + "└─Projection_17 1.00 root test.t1.a, test.t1.b", + " └─IndexLookUp_16 1.00 root ", + " ├─Limit_15(Build) 1.00 cop[tikv] offset:0, count:1", + " │ └─IndexRangeScan_13 1.00 cop[tikv] table:t1, index:idx_a(a) range:[-inf,10), keep order:true, stats:pseudo", + " └─TableRowIDScan_14(Probe) 1.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain select /*+ keep_order(t, primary) */ * from t where a<10 order by a limit 1;", + "Plan": [ + "Limit_11 1.00 root offset:0, count:1", + "└─TableReader_15 1.00 root data:Limit_14", + " └─Limit_14 1.00 cop[tikv] offset:0, count:1", + " └─TableRangeScan_13 333.33 cop[tikv] table:t range:[-inf,10), keep order:true, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain select /*+ no_keep_order(t1, idx_a) */ * from t1 where a<10 order by a limit 1;", + "Plan": [ + "TopN_9 1.00 root test.t1.a, offset:0, count:1", + "└─IndexLookUp_16 1.00 root ", + " ├─TopN_15(Build) 1.00 cop[tikv] test.t1.a, offset:0, count:1", + " │ └─IndexRangeScan_13 3323.33 cop[tikv] table:t1, index:idx_a(a) range:[-inf,10), keep order:false, stats:pseudo", + " └─TableRowIDScan_14(Probe) 1.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain select /*+ no_keep_order(t, primary) */ * from t where a<10 order by a limit 1;", + "Plan": [ + "TopN_8 1.00 root test.t.a, offset:0, count:1", + "└─TableReader_15 1.00 root data:TopN_14", + " └─TopN_14 1.00 cop[tikv] test.t.a, offset:0, count:1", + " └─TableRangeScan_13 3333.33 cop[tikv] table:t range:[-inf,10), keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain select /*+ no_keep_order(t1, idx_a) */ * from t1 where a<10 limit 1;", + "Plan": [ + "IndexLookUp_13 1.00 root limit embedded(offset:0, count:1)", + "├─Limit_12(Build) 1.00 cop[tikv] offset:0, count:1", + "│ └─IndexRangeScan_10 1.00 cop[tikv] table:t1, index:idx_a(a) range:[-inf,10), keep order:false, stats:pseudo", + "└─TableRowIDScan_11(Probe) 1.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain select /*+ no_keep_order(t, primary) */ * from t where a<10 limit 1;", + "Plan": [ + "Limit_8 1.00 root offset:0, count:1", + "└─TableReader_12 1.00 root data:Limit_11", + " └─Limit_11 1.00 cop[tikv] offset:0, count:1", + " └─TableRangeScan_10 333.33 cop[tikv] table:t range:[-inf,10), keep order:false, stats:pseudo" + ], + "Warn": null + } + ] + }, { "Name": "TestViewHint", "Cases": [ diff --git a/planner/util/path.go b/planner/util/path.go index 23cf19f72871c..2ee67eea8b6aa 100644 --- a/planner/util/path.go +++ b/planner/util/path.go @@ -63,7 +63,9 @@ type AccessPath struct { IsIntHandlePath bool IsCommonHandlePath bool // Forced means this path is generated by `use/force index()`. - Forced bool + Forced bool + ForceKeepOrder bool + ForceNoKeepOrder bool // IsSingleScan indicates whether the path is a single index/table scan or table access after index scan. IsSingleScan bool @@ -97,6 +99,8 @@ func (path *AccessPath) Clone() *AccessPath { IsIntHandlePath: path.IsIntHandlePath, IsCommonHandlePath: path.IsCommonHandlePath, Forced: path.Forced, + ForceKeepOrder: path.ForceKeepOrder, + ForceNoKeepOrder: path.ForceNoKeepOrder, IsSingleScan: path.IsSingleScan, IsUkShardIndexPath: path.IsUkShardIndexPath, }