diff --git a/executor/executor.go b/executor/executor.go index 51fe15fecbf7e..fe9cc2ff4e6bf 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -112,6 +112,7 @@ const ( // globalPanicOnExceed panics when GlobalDisTracker storage usage exceeds storage quota. type globalPanicOnExceed struct { + memory.BaseOOMAction mutex sync.Mutex // For synchronization. } @@ -142,8 +143,10 @@ func (a *globalPanicOnExceed) Action(t *memory.Tracker) { panic(msg) } -// SetFallback sets a fallback action. -func (a *globalPanicOnExceed) SetFallback(memory.ActionOnExceed) {} +// GetPriority get the priority of the Action +func (a *globalPanicOnExceed) GetPriority() int64 { + return memory.DefPanicPriority +} // base returns the baseExecutor of an executor, don't override this method! func (e *baseExecutor) base() *baseExecutor { diff --git a/executor/executor_test.go b/executor/executor_test.go index 546238b94d1fb..88467a86efddc 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -68,6 +68,7 @@ import ( "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/testkit" @@ -7004,3 +7005,35 @@ func (s *testSuite) TestIssue20305(c *C) { tk.MustExec("INSERT INTO `t3` VALUES (2069, 70), (2010, 11), (2155, 2156), (2069, 69)") tk.MustQuery("SELECT * FROM `t3` where y <= a").Check(testkit.Rows("2155 2156")) } + +func (s *testSuite) TestOOMActionPriority(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t0") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("drop table if exists t3") + tk.MustExec("drop table if exists t4") + tk.MustExec("create table t0(a int)") + tk.MustExec("insert into t0 values(1)") + tk.MustExec("create table t1(a int)") + tk.MustExec("insert into t1 values(1)") + tk.MustExec("create table t2(a int)") + tk.MustExec("insert into t2 values(1)") + tk.MustExec("create table t3(a int)") + tk.MustExec("insert into t3 values(1)") + tk.MustExec("create table t4(a int)") + tk.MustExec("insert into t4 values(1)") + tk.MustQuery("select * from t0 join t1 join t2 join t3 join t4 order by t0.a").Check(testkit.Rows("1 1 1 1 1")) + action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest() + // check the first 5 actions is rate limit. + for i := 0; i < 5; i++ { + c.Assert(action.GetPriority(), Equals, int64(memory.DefRateLimitPriority)) + action = action.GetFallback() + } + for action.GetFallback() != nil { + c.Assert(action.GetPriority(), Equals, int64(memory.DefSpillPriority)) + action = action.GetFallback() + } + c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority)) +} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 702f4198e829f..68bd938db9ca7 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -1308,9 +1308,9 @@ func (it copErrorResponse) Close() error { // set on initial. Each time the Action is triggered, one token would be destroyed. If the count of the token is less // than 2, the action would be delegated to the fallback action. type rateLimitAction struct { + memory.BaseOOMAction // enabled indicates whether the rateLimitAction is permitted to Action. 1 means permitted, 0 denied. - enabled uint32 - fallbackAction memory.ActionOnExceed + enabled uint32 // totalTokenNum indicates the total token at initial totalTokenNum uint cond struct { @@ -1352,8 +1352,8 @@ func (e *rateLimitAction) Action(t *memory.Tracker) { }) if !e.isEnabled() { - if e.fallbackAction != nil { - e.fallbackAction.Action(t) + if fallback := e.GetFallback(); fallback != nil { + fallback.Action(t) } return } @@ -1364,8 +1364,8 @@ func (e *rateLimitAction) Action(t *memory.Tracker) { e.setEnabled(false) logutil.BgLogger().Info("memory exceed quota, rateLimitAction delegate to fallback action", zap.Uint("total token count", e.totalTokenNum)) - if e.fallbackAction != nil { - e.fallbackAction.Action(t) + if fallback := e.GetFallback(); fallback != nil { + fallback.Action(t) } return } @@ -1391,9 +1391,9 @@ func (e *rateLimitAction) SetLogHook(hook func(uint64)) { } -// SetFallback implements ActionOnExceed.SetFallback -func (e *rateLimitAction) SetFallback(a memory.ActionOnExceed) { - e.fallbackAction = a +// GetPriority get the priority of the Action. +func (e *rateLimitAction) GetPriority() int64 { + return memory.DefRateLimitPriority } // destroyTokenIfNeeded will check the `exceed` flag after copWorker finished one task. diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 6ff4d4300ff99..475e3110949ed 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -259,11 +259,11 @@ func (c *RowContainer) ActionSpillForTest() *SpillDiskAction { // the memory quota of a query is exceeded, SpillDiskAction.Action is // triggered. type SpillDiskAction struct { - c *RowContainer - fallbackAction memory.ActionOnExceed - m sync.Mutex - once sync.Once - cond spillStatusCond + memory.BaseOOMAction + c *RowContainer + m sync.Mutex + once sync.Once + cond spillStatusCond // test function only used for test sync. testSyncInputFunc func() @@ -333,8 +333,8 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) { if !t.CheckExceed() { return } - if a.fallbackAction != nil { - a.fallbackAction.Action(t) + if fallback := a.GetFallback(); fallback != nil { + fallback.Action(t) } } @@ -346,14 +346,14 @@ func (a *SpillDiskAction) Reset() { a.once = sync.Once{} } -// SetFallback sets the fallback action. -func (a *SpillDiskAction) SetFallback(fallback memory.ActionOnExceed) { - a.fallbackAction = fallback -} - // SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. func (a *SpillDiskAction) SetLogHook(hook func(uint64)) {} +// GetPriority get the priority of the Action. +func (a *SpillDiskAction) GetPriority() int64 { + return memory.DefSpillPriority +} + // WaitForTest waits all goroutine have gone. func (a *SpillDiskAction) WaitForTest() { a.testWg.Wait() @@ -528,16 +528,11 @@ func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) { if !t.CheckExceed() { return } - if a.fallbackAction != nil { - a.fallbackAction.Action(t) + if fallback := a.GetFallback(); fallback != nil { + fallback.Action(t) } } -// SetFallback sets the fallback action. -func (a *SortAndSpillDiskAction) SetFallback(fallback memory.ActionOnExceed) { - a.fallbackAction = fallback -} - // SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. func (a *SortAndSpillDiskAction) SetLogHook(hook func(uint64)) {} diff --git a/util/memory/action.go b/util/memory/action.go index d78d4994272a9..4f12a5d90f139 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -35,10 +35,39 @@ type ActionOnExceed interface { // SetFallback sets a fallback action which will be triggered if itself has // already been triggered. SetFallback(a ActionOnExceed) + // GetFallback get the fallback action of the Action. + GetFallback() ActionOnExceed + // GetPriority get the priority of the Action. + GetPriority() int64 } +// BaseOOMAction manages the fallback action for all Action. +type BaseOOMAction struct { + fallbackAction ActionOnExceed +} + +// SetFallback sets a fallback action which will be triggered if itself has +// already been triggered. +func (b *BaseOOMAction) SetFallback(a ActionOnExceed) { + b.fallbackAction = a +} + +// GetFallback get the fallback action of the Action. +func (b *BaseOOMAction) GetFallback() ActionOnExceed { + return b.fallbackAction +} + +// Default OOM Action priority. +const ( + DefPanicPriority = iota + DefLogPriority + DefSpillPriority + DefRateLimitPriority +) + // LogOnExceed logs a warning only once when memory usage exceeds memory quota. type LogOnExceed struct { + BaseOOMAction mutex sync.Mutex // For synchronization. acted bool ConnID uint64 @@ -65,11 +94,14 @@ func (a *LogOnExceed) Action(t *Tracker) { } } -// SetFallback sets a fallback action. -func (a *LogOnExceed) SetFallback(ActionOnExceed) {} +// GetPriority get the priority of the Action +func (a *LogOnExceed) GetPriority() int64 { + return DefLogPriority +} // PanicOnExceed panics when memory usage exceeds memory quota. type PanicOnExceed struct { + BaseOOMAction mutex sync.Mutex // For synchronization. acted bool ConnID uint64 @@ -96,8 +128,10 @@ func (a *PanicOnExceed) Action(t *Tracker) { panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID)) } -// SetFallback sets a fallback action. -func (a *PanicOnExceed) SetFallback(ActionOnExceed) {} +// GetPriority get the priority of the Action +func (a *PanicOnExceed) GetPriority() int64 { + return DefPanicPriority +} var ( errMemExceedThreshold = dbterror.ClassUtil.NewStd(errno.ErrMemExceedThreshold) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index a6bfcbd3b2809..ebfdd93aae03f 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -121,8 +121,31 @@ func (t *Tracker) SetActionOnExceed(a ActionOnExceed) { func (t *Tracker) FallbackOldAndSetNewAction(a ActionOnExceed) { t.actionMu.Lock() defer t.actionMu.Unlock() - a.SetFallback(t.actionMu.actionOnExceed) - t.actionMu.actionOnExceed = a + t.actionMu.actionOnExceed = reArrangeFallback(t.actionMu.actionOnExceed, a) +} + +// GetFallbackForTest get the oom action used by test. +func (t *Tracker) GetFallbackForTest() ActionOnExceed { + t.actionMu.Lock() + defer t.actionMu.Unlock() + return t.actionMu.actionOnExceed +} + +// reArrangeFallback merge two action chains and rearrange them by priority in descending order. +func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed { + if a == nil { + return b + } + if b == nil { + return a + } + if a.GetPriority() < b.GetPriority() { + a, b = b, a + a.SetFallback(b) + } else { + a.SetFallback(reArrangeFallback(a.GetFallback(), b)) + } + return a } // SetLabel sets the label of a Tracker. diff --git a/util/memory/tracker_test.go b/util/memory/tracker_test.go index 697fc6bb48ffa..1e5f19ab7cd3a 100644 --- a/util/memory/tracker_test.go +++ b/util/memory/tracker_test.go @@ -106,31 +106,32 @@ func (s *testSuite) TestOOMAction(c *C) { c.Assert(action1.called, IsFalse) c.Assert(action2.called, IsFalse) tracker.Consume(10000) - c.Assert(action1.called, IsFalse) - c.Assert(action2.called, IsTrue) + c.Assert(action1.called, IsTrue) + c.Assert(action2.called, IsFalse) tracker.Consume(10000) c.Assert(action1.called, IsTrue) c.Assert(action2.called, IsTrue) } type mockAction struct { + BaseOOMAction called bool - fallback ActionOnExceed + priority int64 } func (a *mockAction) SetLogHook(hook func(uint64)) { } func (a *mockAction) Action(t *Tracker) { - if a.called && a.fallback != nil { - a.fallback.Action(t) + if a.called && a.fallbackAction != nil { + a.fallbackAction.Action(t) return } a.called = true } -func (a *mockAction) SetFallback(fallback ActionOnExceed) { - a.fallback = fallback +func (a *mockAction) GetPriority() int64 { + return a.priority } func (s *testSuite) TestAttachTo(c *C) { @@ -341,3 +342,38 @@ func BenchmarkConsume(b *testing.B) { func (s *testSuite) TestErrorCode(c *C) { c.Assert(int(terror.ToSQLError(errMemExceedThreshold).Code), Equals, errno.ErrMemExceedThreshold) } + +func (s *testSuite) TestOOMActionPriority(c *C) { + tracker := NewTracker(1, 100) + // make sure no panic here. + tracker.Consume(10000) + + tracker = NewTracker(1, 1) + tracker.actionMu.actionOnExceed = nil + n := 100 + actions := make([]*mockAction, n) + for i := 0; i < n; i++ { + actions[i] = &mockAction{priority: int64(i)} + } + + randomSuffle := make([]int, n) + for i := 0; i < n; i++ { + randomSuffle[i] = i + pos := rand.Int() % (i + 1) + randomSuffle[i], randomSuffle[pos] = randomSuffle[pos], randomSuffle[i] + } + + for i := 0; i < n; i++ { + tracker.FallbackOldAndSetNewAction(actions[randomSuffle[i]]) + } + for i := n - 1; i >= 0; i-- { + tracker.Consume(100) + for j := n - 1; j >= 0; j-- { + if j >= i { + c.Assert(actions[j].called, IsTrue) + } else { + c.Assert(actions[j].called, IsFalse) + } + } + } +}