Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#54095
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
XuHuaiyu authored and ti-chi-bot committed Jun 27, 2024
1 parent 4a6f3e5 commit 92545e8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 8 deletions.
39 changes: 38 additions & 1 deletion pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ func (e *HashAggExec) initForUnparallelExec() {

e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0))
if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.ID(), -1)
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker)
vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
Expand All @@ -266,7 +270,40 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
}
e.partialOutputChs = make([]chan *HashAggIntermData, finalConcurrency)
for i := range e.partialOutputChs {
<<<<<<< HEAD
e.partialOutputChs[i] = make(chan *HashAggIntermData, partialConcurrency)
=======
e.partialOutputChs[i] = make(chan *aggfuncs.AggPartialResultMapper, partialConcurrency)
}

e.inflightChunkSync = &sync.WaitGroup{}

isTrackerEnabled := e.Ctx().GetSessionVars().TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load()
isParallelHashAggSpillEnabled := e.Ctx().GetSessionVars().EnableParallelHashaggSpill

baseRetTypeNum := len(e.RetFieldTypes())

// Intermediate result for aggregate function also need to be spilled,
// so the number of spillChunkFieldTypes should be added 1.
spillChunkFieldTypes := make([]*types.FieldType, baseRetTypeNum+1)
for i := 0; i < baseRetTypeNum; i++ {
spillChunkFieldTypes[i] = types.NewFieldType(mysql.TypeVarString)
}
spillChunkFieldTypes[baseRetTypeNum] = types.NewFieldType(mysql.TypeString)
e.spillHelper = newSpillHelper(e.memTracker, e.PartialAggFuncs, func() *chunk.Chunk {
return chunk.New(spillChunkFieldTypes, e.InitCap(), e.MaxChunkSize())
}, spillChunkFieldTypes)

if isTrackerEnabled && isParallelHashAggSpillEnabled {
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.diskTracker.AttachTo(sessionVars.StmtCtx.DiskTracker)
e.spillHelper.diskTracker = e.diskTracker
sessionVars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095))
}

e.partialWorkers = make([]HashAggPartialWorker, partialConcurrency)
Expand Down
7 changes: 5 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,11 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.initRuntimeStats()
e.memTracker = memory.NewTracker(e.ID(), -1)
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.finished = make(chan struct{})
Expand Down Expand Up @@ -846,7 +850,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker = nil
e.resultCurr = nil
return nil
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,14 @@ func (e *HashJoinExec) Close() error {
close(e.probeWorkers[i].joinChkResourceCh)
channel.Clear(e.probeWorkers[i].joinChkResourceCh)
}
<<<<<<< HEAD:pkg/executor/join.go
e.probeSideTupleFetcher.probeChkResourceCh = nil
terror.Call(e.rowContainer.Close)
=======
e.ProbeSideTupleFetcher.probeChkResourceCh = nil
terror.Call(e.RowContainer.Close)
e.HashJoinCtxV1.SessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.RowContainer.ActionSpill())
>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go
e.waiterWg.Wait()
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
Expand Down Expand Up @@ -214,8 +220,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.ID(), -1)
e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)
if e.HashJoinCtxV1.diskTracker != nil {
e.HashJoinCtxV1.diskTracker.Reset()
} else {
e.HashJoinCtxV1.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.HashJoinCtxV1.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)

e.workerWg = util.WaitGroupWrapper{}
e.waiterWg = util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1423,7 +1433,11 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

if e.canUseCache {
// create a new one since it may be in the cache
<<<<<<< HEAD:pkg/executor/join.go
e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize())
=======
e.InnerList = chunk.NewListWithMemTracker(exec.RetTypes(e.InnerExec), e.InitCap(), e.MaxChunkSize(), e.InnerList.GetMemTracker())
>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go
} else {
e.innerList.Reset()
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,23 @@ type RowPtr struct {
RowIdx uint32
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker.
func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List {
l := &List{
fieldTypes: fieldTypes,
initChunkSize: initChunkSize,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker(memory.LabelForChunkList, -1),
memTracker: tracker,
consumedIdx: -1,
}
return l
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1))
}

// GetMemTracker returns the memory tracker of this List.
func (l *List) GetMemTracker() *memory.Tracker {
return l.memTracker
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,14 @@ func (c *RowContainer) Close() (err error) {
c.actionSpill.cond.Broadcast()
c.actionSpill.SetFinished()
}
c.memTracker.Detach()
c.diskTracker.Detach()
if c.alreadySpilled() {
err = c.m.records.inDisk.Close()
c.m.records.inDisk = nil
}
c.m.records.inMemory.Clear()
c.m.records.inMemory = nil
return
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,27 @@ func (t *Tracker) UnbindActions() {
t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{}
}

// UnbindActionFromHardLimit unbinds action from hardLimit.
func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) {
t.actionMuForHardLimit.Lock()
defer t.actionMuForHardLimit.Unlock()

var prev ActionOnExceed
for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() {
if current == actionToUnbind {
if prev == nil {
// actionToUnbind is the first element
t.actionMuForHardLimit.actionOnExceed = current.GetFallback()
} else {
// actionToUnbind is not the first element
prev.SetFallback(current.GetFallback())
}
break
}
prev = current
}
}

// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
if a == nil {
Expand Down

0 comments on commit 92545e8

Please sign in to comment.