diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 4c521b3f57a8c..feabc6c0ad484 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -82,6 +82,7 @@ go_test( "//statistics", "//statistics/handle/globalstats", "//statistics/handle/internal", + "//statistics/handle/usage", "//testkit", "//testkit/testsetup", "//types", diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 9c73982133839..be5edc47ded83 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -86,10 +86,10 @@ type Handle struct { statsCache *cache.StatsCachePointer // tableDelta contains all the delta map from collectors when we dump them to KV. - tableDelta *tableDelta + tableDelta *usage.TableDelta // statsUsage contains all the column stats usage information from collectors when we dump them to KV. - statsUsage *statsUsage + statsUsage *usage.StatsUsage // StatsLoad is used to load stats concurrently StatsLoad StatsLoad @@ -146,8 +146,8 @@ func (h *Handle) Clear() { <-h.ddlEventCh } h.listHead.ClearForTest() - h.tableDelta.reset() - h.statsUsage.reset() + h.tableDelta.Reset() + h.statsUsage.Reset() } type sessionPool interface { @@ -176,8 +176,8 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool ses return nil, err } handle.statsCache = statsCache - handle.tableDelta = newTableDelta() - handle.statsUsage = newStatsUsage() + handle.tableDelta = usage.NewTableDelta() + handle.statsUsage = usage.NewStatsUsage() handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 02c69dc543d22..3bd44accaba8e 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -37,114 +37,15 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) -// tableDelta is used to collect tables' change information. -// All methods of it are thread-safe. -type tableDelta struct { - delta map[int64]variable.TableDelta // map[tableID]delta - lock sync.Mutex -} - -func newTableDelta() *tableDelta { - return &tableDelta{ - delta: make(map[int64]variable.TableDelta), - } -} - -func (m *tableDelta) reset() { - m.lock.Lock() - defer m.lock.Unlock() - m.delta = make(map[int64]variable.TableDelta) -} - -func (m *tableDelta) getDeltaAndReset() map[int64]variable.TableDelta { - m.lock.Lock() - defer m.lock.Unlock() - ret := m.delta - m.delta = make(map[int64]variable.TableDelta) - return ret -} - -func (m *tableDelta) update(id int64, delta int64, count int64, colSize *map[int64]int64) { - m.lock.Lock() - defer m.lock.Unlock() - updateTableDeltaMap(m.delta, id, delta, count, colSize) -} - -func updateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { - item := m[id] - item.Delta += delta - item.Count += count - if item.ColSize == nil { - item.ColSize = make(map[int64]int64) - } - if colSize != nil { - for key, val := range *colSize { - item.ColSize[key] += val - } - } - m[id] = item -} - -func (m *tableDelta) merge(deltaMap map[int64]variable.TableDelta) { - if len(deltaMap) == 0 { - return - } - m.lock.Lock() - defer m.lock.Unlock() - for id, item := range deltaMap { - updateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) - } -} - -// statsUsage maps (tableID, columnID) to the last time when the column stats are used(needed). -// All methods of it are thread-safe. -type statsUsage struct { - usage map[model.TableItemID]time.Time - lock sync.RWMutex -} - -func newStatsUsage() *statsUsage { - return &statsUsage{ - usage: make(map[model.TableItemID]time.Time), - } -} - -func (m *statsUsage) reset() { - m.lock.Lock() - defer m.lock.Unlock() - m.usage = make(map[model.TableItemID]time.Time) -} - -func (m *statsUsage) getUsageAndReset() map[model.TableItemID]time.Time { - m.lock.Lock() - defer m.lock.Unlock() - ret := m.usage - m.usage = make(map[model.TableItemID]time.Time) - return ret -} - -func (m *statsUsage) merge(other map[model.TableItemID]time.Time) { - if len(other) == 0 { - return - } - m.lock.Lock() - defer m.lock.Unlock() - for id, t := range other { - if mt, ok := m.usage[id]; !ok || mt.Before(t) { - m.usage[id] = t - } - } -} - -func merge(s *SessionStatsCollector, deltaMap *tableDelta, colMap *statsUsage) { - deltaMap.merge(s.mapper.getDeltaAndReset()) - colMap.merge(s.statsUsage.getUsageAndReset()) +func merge(s *SessionStatsCollector, deltaMap *usage.TableDelta, colMap *usage.StatsUsage) { + deltaMap.Merge(s.mapper.GetDeltaAndReset()) + colMap.Merge(s.statsUsage.GetUsageAndReset()) } // SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it. type SessionStatsCollector struct { - mapper *tableDelta - statsUsage *statsUsage + mapper *usage.TableDelta + statsUsage *usage.StatsUsage next *SessionStatsCollector sync.Mutex @@ -155,8 +56,8 @@ type SessionStatsCollector struct { // NewSessionStatsCollector initializes a new SessionStatsCollector. func NewSessionStatsCollector() *SessionStatsCollector { return &SessionStatsCollector{ - mapper: newTableDelta(), - statsUsage: newStatsUsage(), + mapper: usage.NewTableDelta(), + statsUsage: usage.NewStatsUsage(), } } @@ -171,15 +72,15 @@ func (s *SessionStatsCollector) Delete() { func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { s.Lock() defer s.Unlock() - s.mapper.update(id, delta, count, colSize) + s.mapper.Update(id, delta, count, colSize) } // ClearForTest clears the mapper for test. func (s *SessionStatsCollector) ClearForTest() { s.Lock() defer s.Unlock() - s.mapper = newTableDelta() - s.statsUsage = newStatsUsage() + s.mapper = usage.NewTableDelta() + s.statsUsage = usage.NewStatsUsage() s.next = nil s.deleted = false } @@ -188,7 +89,7 @@ func (s *SessionStatsCollector) ClearForTest() { func (s *SessionStatsCollector) UpdateColStatsUsage(colMap map[model.TableItemID]time.Time) { s.Lock() defer s.Unlock() - s.statsUsage.merge(colMap) + s.statsUsage.Merge(colMap) } // NewSessionStatsCollector allocates a stats collector for a session. @@ -196,9 +97,9 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { h.listHead.Lock() defer h.listHead.Unlock() newCollector := &SessionStatsCollector{ - mapper: newTableDelta(), + mapper: usage.NewTableDelta(), next: h.listHead.next, - statsUsage: newStatsUsage(), + statsUsage: usage.NewStatsUsage(), } h.listHead.next = newCollector return newCollector @@ -302,8 +203,8 @@ const ( // sweepList will loop over the list, merge each session's local stats into handle // and remove closed session's collector. func (h *Handle) sweepList() { - deltaMap := newTableDelta() - colMap := newStatsUsage() + deltaMap := usage.NewTableDelta() + colMap := usage.NewStatsUsage() prev := h.listHead prev.Lock() for curr := prev.next; curr != nil; curr = curr.next { @@ -321,17 +222,17 @@ func (h *Handle) sweepList() { } } prev.Unlock() - h.tableDelta.merge(deltaMap.getDeltaAndReset()) - h.statsUsage.merge(colMap.getUsageAndReset()) + h.tableDelta.Merge(deltaMap.GetDeltaAndReset()) + h.statsUsage.Merge(colMap.GetUsageAndReset()) } // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { h.sweepList() - deltaMap := h.tableDelta.getDeltaAndReset() + deltaMap := h.tableDelta.GetDeltaAndReset() defer func() { - h.tableDelta.merge(deltaMap) + h.tableDelta.Merge(deltaMap) }() se, err := h.pool.Get() @@ -351,7 +252,7 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { return errors.Trace(err) } if updated { - updateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) + usage.UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) } if err = h.dumpTableStatColSizeToKV(id, item); err != nil { delete(deltaMap, id) @@ -516,9 +417,9 @@ func (h *Handle) DumpColStatsUsageToKV() error { return nil } h.sweepList() - colMap := h.statsUsage.getUsageAndReset() + colMap := h.statsUsage.GetUsageAndReset() defer func() { - h.statsUsage.merge(colMap) + h.statsUsage.Merge(colMap) }() type pair struct { lastUsedAt string diff --git a/statistics/handle/update_list_test.go b/statistics/handle/update_list_test.go index dea9519672ad8..c4f4431e57bf4 100644 --- a/statistics/handle/update_list_test.go +++ b/statistics/handle/update_list_test.go @@ -17,12 +17,13 @@ package handle import ( "testing" + "github.com/pingcap/tidb/statistics/handle/usage" "github.com/stretchr/testify/require" ) func TestInsertAndDelete(t *testing.T) { h := Handle{ - listHead: &SessionStatsCollector{mapper: newTableDelta()}, + listHead: &SessionStatsCollector{mapper: usage.NewTableDelta()}, } var items []*SessionStatsCollector for i := 0; i < 5; i++ { diff --git a/statistics/handle/usage/BUILD.bazel b/statistics/handle/usage/BUILD.bazel index ffe77b944a120..5d1db40d4936f 100644 --- a/statistics/handle/usage/BUILD.bazel +++ b/statistics/handle/usage/BUILD.bazel @@ -2,12 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "usage", - srcs = ["index_usage.go"], + srcs = [ + "index_usage.go", + "stats_usage.go", + "table_delta.go", + ], importpath = "github.com/pingcap/tidb/statistics/handle/usage", visibility = ["//visibility:public"], deps = [ "//kv", + "//parser/model", "//sessionctx", + "//sessionctx/variable", "//types", "//util/sqlexec", "@com_github_pingcap_errors//:errors", diff --git a/statistics/handle/usage/stats_usage.go b/statistics/handle/usage/stats_usage.go new file mode 100644 index 0000000000000..2f0c14a50687f --- /dev/null +++ b/statistics/handle/usage/stats_usage.go @@ -0,0 +1,66 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usage + +import ( + "sync" + "time" + + "github.com/pingcap/tidb/parser/model" +) + +// StatsUsage maps (tableID, columnID) to the last time when the column stats are used(needed). +// All methods of it are thread-safe. +type StatsUsage struct { + usage map[model.TableItemID]time.Time + lock sync.RWMutex +} + +// NewStatsUsage creates a new StatsUsage. +func NewStatsUsage() *StatsUsage { + return &StatsUsage{ + usage: make(map[model.TableItemID]time.Time), + } +} + +// Reset resets the StatsUsage. +func (m *StatsUsage) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.usage = make(map[model.TableItemID]time.Time) +} + +// GetUsageAndReset gets the usage and resets the StatsUsage. +func (m *StatsUsage) GetUsageAndReset() map[model.TableItemID]time.Time { + m.lock.Lock() + defer m.lock.Unlock() + ret := m.usage + m.usage = make(map[model.TableItemID]time.Time) + return ret +} + +// Merge merges the usageMap into the StatsUsage. +func (m *StatsUsage) Merge(other map[model.TableItemID]time.Time) { + if len(other) == 0 { + return + } + m.lock.Lock() + defer m.lock.Unlock() + for id, t := range other { + if mt, ok := m.usage[id]; !ok || mt.Before(t) { + m.usage[id] = t + } + } +} diff --git a/statistics/handle/usage/table_delta.go b/statistics/handle/usage/table_delta.go new file mode 100644 index 0000000000000..4b46c1f859b1b --- /dev/null +++ b/statistics/handle/usage/table_delta.go @@ -0,0 +1,86 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usage + +import ( + "sync" + + "github.com/pingcap/tidb/sessionctx/variable" +) + +// TableDelta is used to collect tables' change information. +// All methods of it are thread-safe. +type TableDelta struct { + delta map[int64]variable.TableDelta // map[tableID]delta + lock sync.Mutex +} + +// NewTableDelta creates a new TableDelta. +func NewTableDelta() *TableDelta { + return &TableDelta{ + delta: make(map[int64]variable.TableDelta), + } +} + +// Reset resets the TableDelta. +func (m *TableDelta) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.delta = make(map[int64]variable.TableDelta) +} + +// GetDeltaAndReset gets the delta and resets the TableDelta. +func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta { + m.lock.Lock() + defer m.lock.Unlock() + ret := m.delta + m.delta = make(map[int64]variable.TableDelta) + return ret +} + +// Update updates the delta of the table. +func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { + m.lock.Lock() + defer m.lock.Unlock() + UpdateTableDeltaMap(m.delta, id, delta, count, colSize) +} + +// Merge merges the deltaMap into the TableDelta. +func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) { + if len(deltaMap) == 0 { + return + } + m.lock.Lock() + defer m.lock.Unlock() + for id, item := range deltaMap { + UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) + } +} + +// UpdateTableDeltaMap updates the delta of the table. +func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { + item := m[id] + item.Delta += delta + item.Count += count + if item.ColSize == nil { + item.ColSize = make(map[int64]int64) + } + if colSize != nil { + for key, val := range *colSize { + item.ColSize[key] += val + } + } + m[id] = item +}