diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 69d2faca949fa..d261a22418943 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -244,7 +244,11 @@ func (e *HashAggExec) initForUnparallelExec() { e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0)) if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() { - e.diskTracker = disk.NewTracker(e.ID(), -1) + if e.diskTracker != nil { + e.diskTracker.Reset() + } else { + e.diskTracker = disk.NewTracker(e.ID(), -1) + } e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker) e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker) vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) @@ -266,7 +270,40 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) { } e.partialOutputChs = make([]chan *HashAggIntermData, finalConcurrency) for i := range e.partialOutputChs { +<<<<<<< HEAD e.partialOutputChs[i] = make(chan *HashAggIntermData, partialConcurrency) +======= + e.partialOutputChs[i] = make(chan *aggfuncs.AggPartialResultMapper, partialConcurrency) + } + + e.inflightChunkSync = &sync.WaitGroup{} + + isTrackerEnabled := e.Ctx().GetSessionVars().TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() + isParallelHashAggSpillEnabled := e.Ctx().GetSessionVars().EnableParallelHashaggSpill + + baseRetTypeNum := len(e.RetFieldTypes()) + + // Intermediate result for aggregate function also need to be spilled, + // so the number of spillChunkFieldTypes should be added 1. + spillChunkFieldTypes := make([]*types.FieldType, baseRetTypeNum+1) + for i := 0; i < baseRetTypeNum; i++ { + spillChunkFieldTypes[i] = types.NewFieldType(mysql.TypeVarString) + } + spillChunkFieldTypes[baseRetTypeNum] = types.NewFieldType(mysql.TypeString) + e.spillHelper = newSpillHelper(e.memTracker, e.PartialAggFuncs, func() *chunk.Chunk { + return chunk.New(spillChunkFieldTypes, e.InitCap(), e.MaxChunkSize()) + }, spillChunkFieldTypes) + + if isTrackerEnabled && isParallelHashAggSpillEnabled { + if e.diskTracker != nil { + e.diskTracker.Reset() + } else { + e.diskTracker = disk.NewTracker(e.ID(), -1) + } + e.diskTracker.AttachTo(sessionVars.StmtCtx.DiskTracker) + e.spillHelper.diskTracker = e.diskTracker + sessionVars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) +>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)) } e.partialWorkers = make([]HashAggPartialWorker, partialConcurrency) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 90c2566880d9f..671a73df1b48c 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -569,7 +569,11 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. e.initRuntimeStats() - e.memTracker = memory.NewTracker(e.ID(), -1) + if e.memTracker != nil { + e.memTracker.Reset() + } else { + e.memTracker = memory.NewTracker(e.ID(), -1) + } e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) e.finished = make(chan struct{}) @@ -846,7 +850,6 @@ func (e *IndexLookUpExecutor) Close() error { e.tblWorkerWg.Wait() e.finished = nil e.workerStarted = false - e.memTracker = nil e.resultCurr = nil return nil } diff --git a/pkg/executor/join.go b/pkg/executor/join.go index efaa228b74168..cfcc7db41fd2b 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -174,8 +174,14 @@ func (e *HashJoinExec) Close() error { close(e.probeWorkers[i].joinChkResourceCh) channel.Clear(e.probeWorkers[i].joinChkResourceCh) } +<<<<<<< HEAD:pkg/executor/join.go e.probeSideTupleFetcher.probeChkResourceCh = nil terror.Call(e.rowContainer.Close) +======= + e.ProbeSideTupleFetcher.probeChkResourceCh = nil + terror.Call(e.RowContainer.Close) + e.HashJoinCtxV1.SessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.RowContainer.ActionSpill()) +>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go e.waiterWg.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] @@ -214,8 +220,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) - e.diskTracker = disk.NewTracker(e.ID(), -1) - e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) + if e.HashJoinCtxV1.diskTracker != nil { + e.HashJoinCtxV1.diskTracker.Reset() + } else { + e.HashJoinCtxV1.diskTracker = disk.NewTracker(e.ID(), -1) + } + e.HashJoinCtxV1.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) e.workerWg = util.WaitGroupWrapper{} e.waiterWg = util.WaitGroupWrapper{} @@ -1423,7 +1433,11 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { if e.canUseCache { // create a new one since it may be in the cache +<<<<<<< HEAD:pkg/executor/join.go e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize()) +======= + e.InnerList = chunk.NewListWithMemTracker(exec.RetTypes(e.InnerExec), e.InitCap(), e.MaxChunkSize(), e.InnerList.GetMemTracker()) +>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go } else { e.innerList.Reset() } diff --git a/pkg/util/chunk/list.go b/pkg/util/chunk/list.go index 53a52581e74cb..18df528e324f0 100644 --- a/pkg/util/chunk/list.go +++ b/pkg/util/chunk/list.go @@ -40,18 +40,23 @@ type RowPtr struct { RowIdx uint32 } -// NewList creates a new List with field types, init chunk size and max chunk size. -func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { +// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker. +func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List { l := &List{ fieldTypes: fieldTypes, initChunkSize: initChunkSize, maxChunkSize: maxChunkSize, - memTracker: memory.NewTracker(memory.LabelForChunkList, -1), + memTracker: tracker, consumedIdx: -1, } return l } +// NewList creates a new List with field types, init chunk size and max chunk size. +func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { + return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1)) +} + // GetMemTracker returns the memory tracker of this List. func (l *List) GetMemTracker() *memory.Tracker { return l.memTracker diff --git a/pkg/util/chunk/row_container.go b/pkg/util/chunk/row_container.go index 97c268fccecfa..872c38735a411 100644 --- a/pkg/util/chunk/row_container.go +++ b/pkg/util/chunk/row_container.go @@ -336,11 +336,14 @@ func (c *RowContainer) Close() (err error) { c.actionSpill.cond.Broadcast() c.actionSpill.SetFinished() } + c.memTracker.Detach() + c.diskTracker.Detach() if c.alreadySpilled() { err = c.m.records.inDisk.Close() c.m.records.inDisk = nil } c.m.records.inMemory.Clear() + c.m.records.inMemory = nil return } diff --git a/pkg/util/memory/tracker.go b/pkg/util/memory/tracker.go index a85c7185a2b0f..35ddd57cc081c 100644 --- a/pkg/util/memory/tracker.go +++ b/pkg/util/memory/tracker.go @@ -247,6 +247,27 @@ func (t *Tracker) UnbindActions() { t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{} } +// UnbindActionFromHardLimit unbinds action from hardLimit. +func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) { + t.actionMuForHardLimit.Lock() + defer t.actionMuForHardLimit.Unlock() + + var prev ActionOnExceed + for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() { + if current == actionToUnbind { + if prev == nil { + // actionToUnbind is the first element + t.actionMuForHardLimit.actionOnExceed = current.GetFallback() + } else { + // actionToUnbind is not the first element + prev.SetFallback(current.GetFallback()) + } + break + } + prev = current + } +} + // reArrangeFallback merge two action chains and rearrange them by priority in descending order. func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed { if a == nil {