Skip to content

Commit

Permalink
ttl, domain: setup a customized session pool with stats collector (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 17, 2023
1 parent eaabea6 commit b606a97
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 29 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_test(
"//tablecodec",
"//testkit",
"//testkit/testsetup",
"//ttl/ttlworker",
"//types",
"//util/codec",
"@com_github_fsouza_fake_gcs_server//fakestorage",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -368,9 +369,12 @@ func TestFilterDDLJobByRules(t *testing.T) {
}

func TestGetExistedUserDBs(t *testing.T) {
ttlworker.SkipTTLJobManager4Test = true

m, err := mock.NewCluster()
require.Nil(t, err)
defer m.Stop()

dom := m.Domain

dbs := restore.GetExistedUserDBs(dom)
Expand Down
27 changes: 13 additions & 14 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,10 +1068,6 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2403,18 +2399,21 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager
// StartTTLJobManager creates and starts the ttl job manager
func (do *Domain) StartTTLJobManager() {
do.wg.Run(func() {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

<-do.exit
<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(context.Background(), 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
})
}

// TTLJobManager returns the ttl job manager on this domain
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//table/temptable",
"//tablecodec",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//types/parser_driver",
"//util",
Expand Down
37 changes: 37 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -3390,6 +3391,22 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

// start TTL job manager after setup stats collector
// because TTL could modify a lot of columns, and need to trigger auto analyze
ttlworker.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return attachStatsCollector(s, dom)
}
return s
}
ttlworker.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return detachStatsCollector(s)
}
return s
}
dom.StartTTLJobManager()

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3495,6 +3512,26 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
return s, nil
}

// attachStatsCollector attaches the stats collector in the dom for the session
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
}
}

return s
}

// detachStatsCollector removes the stats collector in the session
func detachStatsCollector(s *session) *session {
s.statsCollector = nil
s.idxUsageCollector = nil

return s
}

// CreateSessionWithDomain creates a new Session and binds it with a Domain.
// We need this because when we start DDL in Domain, the DDL need a session
// to change some system tables. But at that time, we have been already in
Expand Down
1 change: 1 addition & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//session",
"//sessionctx",
"//sessionctx/variable",
"//statistics/handle",
"//testkit",
"//ttl/cache",
"//ttl/session",
Expand Down
23 changes: 23 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package ttlworker

import (
"time"

"github.com/pingcap/failpoint"
)

const jobManagerLoopTickerInterval = 10 * time.Second
Expand All @@ -27,3 +29,24 @@ const ttlInternalSQLTimeout = 30 * time.Second
const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateInfoSchemaCacheInterval
}

func getUpdateTTLTableStatusCacheInterval() time.Duration {
failpoint.Inject("update-status-table-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateTTLTableStatusCacheInterval
}

func getResizeWorkersInterval() time.Duration {
failpoint.Inject("resize-workers-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return resizeWorkersInterval
}
18 changes: 14 additions & 4 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_

const timeFormat = "2006-01-02 15:04:05"

// SkipTTLJobManager4Test skips the bootstrap of TTLJobManager if it's true. It's used to avoid data race in test.
var SkipTTLJobManager4Test bool

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) {
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}
Expand Down Expand Up @@ -101,11 +104,18 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *
manager.delCh = make(chan *ttlDeleteTask)
manager.notifyStateCh = make(chan interface{}, 1)

if SkipTTLJobManager4Test {
manager.init(func() error {
return nil
})
return
}

manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager")

manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval)
manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval)
manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

return
}
Expand All @@ -127,7 +137,7 @@ func (m *JobManager) jobLoop() error {
updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval)
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(resizeWorkersInterval)
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
for {
m.reportMetrics()
now := se.Now()
Expand Down Expand Up @@ -489,7 +499,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
hbTime := table.CurrentJobOwnerHBTime
// a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the
// `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases.
if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) {
if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) {
logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now))
return true
}
Expand Down
57 changes: 57 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
Expand Down Expand Up @@ -123,3 +125,58 @@ func TestFinishJob(t *testing.T) {

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}

func TestTTLAutoAnalyze(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second))
originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt
handle.AutoAnalyzeMinCnt = 0
defer func() {
handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt
}()

store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day")

// insert ten rows, the 2,3,4,6,9,10 of them are expired
for i := 1; i <= 10; i++ {
t := time.Now()
if i%2 == 0 || i%3 == 0 {
t = t.Add(-time.Hour * 48)
}

tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339))
}
// TODO: use a better way to pause and restart ttl worker after analyze the table to make it more stable
// but as the ttl worker takes several seconds to start, it's not too serious.
tk.MustExec("analyze table t")
rows := tk.MustQuery("show stats_meta").Rows()
require.Equal(t, rows[0][4], "0")
require.Equal(t, rows[0][5], "10")

retryTime := 15
retryInterval := time.Second * 2
deleted := false
for retryTime >= 0 {
retryTime--
time.Sleep(retryInterval)

rows := tk.MustQuery("select count(*) from t").Rows()
count := rows[0][0].(string)
if count == "3" {
deleted = true
break
}
}
require.True(t, deleted, "ttl should remove expired rows")

h := dom.StatsHandle()
is := dom.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.True(t, h.HandleAutoAnalyze(is))
}
Loading

0 comments on commit b606a97

Please sign in to comment.