From dc9b37da2b2714c0713df0f2d484451e607b2940 Mon Sep 17 00:00:00 2001 From: "TONG, Zhigao" Date: Fri, 8 Sep 2023 18:49:37 +0800 Subject: [PATCH] This is an automated cherry-pick of #46662 Signed-off-by: ti-chi-bot --- executor/aggregate/agg_hash_executor.go | 690 ++++++++++++++++++++++ executor/issuetest/executor_issue_test.go | 33 ++ 2 files changed, 723 insertions(+) create mode 100644 executor/aggregate/agg_hash_executor.go diff --git a/executor/aggregate/agg_hash_executor.go b/executor/aggregate/agg_hash_executor.go new file mode 100644 index 0000000000000..83d38c50519f8 --- /dev/null +++ b/executor/aggregate/agg_hash_executor.go @@ -0,0 +1,690 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/executor/aggfuncs" + "github.com/pingcap/tidb/executor/internal/exec" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/channel" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/set" +) + +// HashAggInput indicates the input of hash agg exec. +type HashAggInput struct { + chk *chunk.Chunk + // giveBackCh is bound with specific partial worker, + // it's used to reuse the `chk`, + // and tell the data-fetcher which partial worker it should send data to. + giveBackCh chan<- *chunk.Chunk +} + +// HashAggExec deals with all the aggregate functions. +// It is built from the Aggregate Plan. When Next() is called, it reads all the data from Src +// and updates all the items in PartialAggFuncs. +// The parallel execution flow is as the following graph shows: +/* + +-------------+ + | Main Thread | + +------+------+ + ^ + | + + + +-+- +-+ + | | ...... | | finalOutputCh + +++- +-+ + ^ + | + +---------------+ + | | + +--------------+ +--------------+ + | final worker | ...... | final worker | + +------------+-+ +-+------------+ + ^ ^ + | | + +-+ +-+ ...... +-+ + | | | | | | + ... ... ... partialOutputChs + | | | | | | + +++ +++ +++ + ^ ^ ^ + +-+ | | | + | | +--------o----+ | + inputCh +-+ | +-----------------+---+ + | | | | + ... +---+------------+ +----+-----------+ + | | | partial worker | ...... | partial worker | + +++ +--------------+-+ +-+--------------+ + | ^ ^ + | | | + +----v---------+ +++ +-+ +++ + | data fetcher | +------> | | | | ...... | | partialInputChs + +--------------+ +-+ +-+ +-+ +*/ +type HashAggExec struct { + exec.BaseExecutor + + Sc *stmtctx.StatementContext + PartialAggFuncs []aggfuncs.AggFunc + FinalAggFuncs []aggfuncs.AggFunc + partialResultMap AggPartialResultMapper + bInMap int64 // indicate there are 2^bInMap buckets in partialResultMap + groupSet set.StringSetWithMemoryUsage + groupKeys []string + cursor4GroupKey int + GroupByItems []expression.Expression + groupKeyBuffer [][]byte + + finishCh chan struct{} + finalOutputCh chan *AfFinalResult + partialOutputChs []chan *HashAggIntermData + inputCh chan *HashAggInput + partialInputChs []chan *chunk.Chunk + partialWorkers []HashAggPartialWorker + finalWorkers []HashAggFinalWorker + DefaultVal *chunk.Chunk + childResult *chunk.Chunk + + // IsChildReturnEmpty indicates whether the child executor only returns an empty input. + IsChildReturnEmpty bool + // After we support parallel execution for aggregation functions with distinct, + // we can remove this attribute. + IsUnparallelExec bool + parallelExecValid bool + prepared bool + executed bool + + memTracker *memory.Tracker // track memory usage. + diskTracker *disk.Tracker + + stats *HashAggRuntimeStats + + // listInDisk is the chunks to store row values for spilled data. + // The HashAggExec may be set to `spill mode` multiple times, and all spilled data will be appended to ListInDisk. + listInDisk *chunk.ListInDisk + // numOfSpilledChks indicates the number of all the spilled chunks. + numOfSpilledChks int + // offsetOfSpilledChks indicates the offset of the chunk be read from the disk. + // In each round of processing, we need to re-fetch all the chunks spilled in the last one. + offsetOfSpilledChks int + // inSpillMode indicates whether HashAgg is in `spill mode`. + // When HashAgg is in `spill mode`, the size of `partialResultMap` is no longer growing and all the data fetched + // from the child executor is spilled to the disk. + inSpillMode uint32 + // tmpChkForSpill is the temp chunk for spilling. + tmpChkForSpill *chunk.Chunk + // spillAction save the Action for spilling. + spillAction *AggSpillDiskAction + // isChildDrained indicates whether the all data from child has been taken out. + isChildDrained bool +} + +// Close implements the Executor Close interface. +func (e *HashAggExec) Close() error { + if e.stats != nil { + defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) + } + if e.IsUnparallelExec { + var firstErr error + e.childResult = nil + e.groupSet, _ = set.NewStringSetWithMemoryUsage() + e.partialResultMap = nil + if e.memTracker != nil { + e.memTracker.ReplaceBytesUsed(0) + } + if e.listInDisk != nil { + firstErr = e.listInDisk.Close() + } + e.spillAction, e.tmpChkForSpill = nil, nil + if err := e.BaseExecutor.Close(); firstErr == nil { + firstErr = err + } + return firstErr + } + if e.parallelExecValid { + // `Close` may be called after `Open` without calling `Next` in test. + if !e.prepared { + close(e.inputCh) + for _, ch := range e.partialOutputChs { + close(ch) + } + for _, ch := range e.partialInputChs { + close(ch) + } + close(e.finalOutputCh) + } + close(e.finishCh) + for _, ch := range e.partialOutputChs { + channel.Clear(ch) + } + for _, ch := range e.partialInputChs { + channel.Clear(ch) + } + channel.Clear(e.finalOutputCh) + e.executed = false + if e.memTracker != nil { + e.memTracker.ReplaceBytesUsed(0) + } + e.parallelExecValid = false + } + return e.BaseExecutor.Close() +} + +// Open implements the Executor Open interface. +func (e *HashAggExec) Open(ctx context.Context) error { + failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error")) + } + }) + + if err := e.BaseExecutor.Open(ctx); err != nil { + return err + } + e.prepared = false + + if e.memTracker != nil { + e.memTracker.Reset() + } else { + e.memTracker = memory.NewTracker(e.ID(), -1) + } + if e.Ctx().GetSessionVars().TrackAggregateMemoryUsage { + e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + } + + if e.IsUnparallelExec { + e.initForUnparallelExec() + return nil + } + e.initForParallelExec(e.Ctx()) + return nil +} + +func (e *HashAggExec) initForUnparallelExec() { + var setSize int64 + e.groupSet, setSize = set.NewStringSetWithMemoryUsage() + e.partialResultMap = make(AggPartialResultMapper) + e.bInMap = 0 + failpoint.Inject("ConsumeRandomPanic", nil) + e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice*(1< 0 { + e.IsChildReturnEmpty = false + return nil + } + } +} + +// unparallelExec executes hash aggregation algorithm in single thread. +func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + for { + if e.prepared { + // Since we return e.MaxChunkSize() rows every time, so we should not traverse + // `groupSet` because of its randomness. + for ; e.cursor4GroupKey < len(e.groupKeys); e.cursor4GroupKey++ { + partialResults := e.getPartialResults(e.groupKeys[e.cursor4GroupKey]) + if len(e.PartialAggFuncs) == 0 { + chk.SetNumVirtualRows(chk.NumRows() + 1) + } + for i, af := range e.PartialAggFuncs { + if err := af.AppendFinalResult2Chunk(e.Ctx(), partialResults[i], chk); err != nil { + return err + } + } + if chk.IsFull() { + e.cursor4GroupKey++ + return nil + } + } + e.resetSpillMode() + } + if e.executed { + return nil + } + if err := e.execute(ctx); err != nil { + return err + } + if (len(e.groupSet.StringSet) == 0) && len(e.GroupByItems) == 0 { + // If no groupby and no data, we should add an empty group. + // For example: + // "select count(c) from t;" should return one row [0] + // "select count(c) from t group by c1;" should return empty result set. + e.memTracker.Consume(e.groupSet.Insert("")) + e.groupKeys = append(e.groupKeys, "") + } + e.prepared = true + } +} + +func (e *HashAggExec) resetSpillMode() { + e.cursor4GroupKey, e.groupKeys = 0, e.groupKeys[:0] + var setSize int64 + e.groupSet, setSize = set.NewStringSetWithMemoryUsage() + e.partialResultMap = make(AggPartialResultMapper) + e.bInMap = 0 + e.prepared = false + e.executed = e.numOfSpilledChks == e.listInDisk.NumChunks() // No data is spilling again, all data have been processed. + e.numOfSpilledChks = e.listInDisk.NumChunks() + e.memTracker.ReplaceBytesUsed(setSize) + atomic.StoreUint32(&e.inSpillMode, 0) +} + +// execute fetches Chunks from src and update each aggregate function for each row in Chunk. +func (e *HashAggExec) execute(ctx context.Context) (err error) { + defer func() { + if e.tmpChkForSpill.NumRows() > 0 && err == nil { + err = e.listInDisk.Add(e.tmpChkForSpill) + e.tmpChkForSpill.Reset() + } + }() + for { + mSize := e.childResult.MemoryUsage() + if err := e.getNextChunk(ctx); err != nil { + return err + } + failpoint.Inject("ConsumeRandomPanic", nil) + e.memTracker.Consume(e.childResult.MemoryUsage() - mSize) + if err != nil { + return err + } + + failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("HashAggExec.unparallelExec error")) + } + }) + + // no more data. + if e.childResult.NumRows() == 0 { + return nil + } + e.groupKeyBuffer, err = GetGroupKey(e.Ctx(), e.childResult, e.groupKeyBuffer, e.GroupByItems) + if err != nil { + return err + } + + allMemDelta := int64(0) + sel := make([]int, 0, e.childResult.NumRows()) + var tmpBuf [1]chunk.Row + for j := 0; j < e.childResult.NumRows(); j++ { + groupKey := string(e.groupKeyBuffer[j]) // do memory copy here, because e.groupKeyBuffer may be reused. + if !e.groupSet.Exist(groupKey) { + if atomic.LoadUint32(&e.inSpillMode) == 1 && e.groupSet.Count() > 0 { + sel = append(sel, j) + continue + } + allMemDelta += e.groupSet.Insert(groupKey) + e.groupKeys = append(e.groupKeys, groupKey) + } + partialResults := e.getPartialResults(groupKey) + for i, af := range e.PartialAggFuncs { + tmpBuf[0] = e.childResult.GetRow(j) + memDelta, err := af.UpdatePartialResult(e.Ctx(), tmpBuf[:], partialResults[i]) + if err != nil { + return err + } + allMemDelta += memDelta + } + } + + // spill unprocessed data when exceeded. + if len(sel) > 0 { + e.childResult.SetSel(sel) + err = e.spillUnprocessedData(len(sel) == cap(sel)) + if err != nil { + return err + } + } + + failpoint.Inject("ConsumeRandomPanic", nil) + e.memTracker.Consume(allMemDelta) + } +} + +func (e *HashAggExec) spillUnprocessedData(isFullChk bool) (err error) { + if isFullChk { + return e.listInDisk.Add(e.childResult) + } + for i := 0; i < e.childResult.NumRows(); i++ { + e.tmpChkForSpill.AppendRow(e.childResult.GetRow(i)) + if e.tmpChkForSpill.IsFull() { + err = e.listInDisk.Add(e.tmpChkForSpill) + if err != nil { + return err + } + e.tmpChkForSpill.Reset() + } + } + return nil +} + +func (e *HashAggExec) getNextChunk(ctx context.Context) (err error) { + e.childResult.Reset() + if !e.isChildDrained { + if err := exec.Next(ctx, e.Children(0), e.childResult); err != nil { + return err + } + if e.childResult.NumRows() != 0 { + return nil + } + e.isChildDrained = true + } + if e.offsetOfSpilledChks < e.numOfSpilledChks { + e.childResult, err = e.listInDisk.GetChunk(e.offsetOfSpilledChks) + if err != nil { + return err + } + e.offsetOfSpilledChks++ + } + return nil +} + +func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResult { + partialResults, ok := e.partialResultMap[groupKey] + allMemDelta := int64(0) + if !ok { + partialResults = make([]aggfuncs.PartialResult, 0, len(e.PartialAggFuncs)) + for _, af := range e.PartialAggFuncs { + partialResult, memDelta := af.AllocPartialResult() + partialResults = append(partialResults, partialResult) + allMemDelta += memDelta + } + // Map will expand when count > bucketNum * loadFactor. The memory usage will doubled. + if len(e.partialResultMap)+1 > (1< ")) // Again, a simpler reproduce. tk.MustQuery("select /*+ inl_join (t1, t2) */ t2.c5 from t1 right join t2 on t1.c2 = t2.c5 where not( t1.c2 between '4s7ht' and 'mj' );").Check(testkit.Rows()) +======= +func TestIssue41778(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(` + CREATE TABLE ta ( + a1 json DEFAULT NULL, + a2 decimal(31, 1) DEFAULT '0' + ); + CREATE TABLE tb ( + b1 smallint(6) DEFAULT '-11385', + b2 decimal(63, 14) DEFAULT '-6197127648752447138876497216172307937317445669286.98661563645110' + ); + CREATE TABLE tc ( + c1 text DEFAULT NULL, + c2 float NOT NULL DEFAULT '1.8132474', + PRIMARY KEY (c2) + /*T![clustered_index] CLUSTERED */ + ); + `) + tk.MustExec(` + insert into ta + values (NULL, 1228.0); + insert into ta + values ('"json string1"', 623.8); + insert into ta + values (NULL, 1337.0); + `) + err := tk.QueryToErr("select count(*)from ta where not ( ta.a1 in ( select b2 from tb where not ( ta.a1 in ( select c1 from tc where ta.a2 in ( select b2 from tb where IsNull(ta.a1) ) ) ) ) )") + require.Equal(t, "[planner:1815]expression isnull(cast(test.ta.a1, var_string(4294967295))) cannot be pushed down", err.Error()) +>>>>>>> 1d55a3c68d2 (executor: fix panic issue when handle `HashAggExec.Close()` (#46662)):executor/test/issuetest/executor_issue_test.go }