Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix cte may hang because register OOMAction repeatedly (#43758) #43774

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (e *CTEExec) Close() (err error) {
}

func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = errors.Errorf("%v", r)
}
}()
failpoint.Inject("testCTESeedPanic", nil)
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
chks := make([]*chunk.Chunk, 0, 10)
Expand All @@ -240,50 +246,56 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
}
chk := tryNewCacheChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
return
}
if chk.NumRows() == 0 {
break
}
if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil {
return err
return
}
chks = append(chks, chk)
}
// Initial resTbl is empty, so no need to deduplicate chk using resTbl.
// Just adding is ok.
for _, chk := range chks {
if err = e.resTbl.Add(chk); err != nil {
return err
return
}
}
e.curIter++
e.iterInTbl.SetIter(e.curIter)

return nil
return
}

func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
err = errors.Errorf("%v", r)
}
}()
failpoint.Inject("testCTERecursivePanic", nil)
if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 {
return nil
return
}

if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter)
}

if e.limitDone(e.resTbl) {
return nil
return
}

for {
chk := tryNewCacheChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
return
}
if chk.NumRows() == 0 {
if err = e.setupTblsForNewIteration(); err != nil {
return err
return
}
if e.limitDone(e.resTbl) {
break
Expand All @@ -300,18 +312,18 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
// Make sure iterInTbl is setup before Close/Open,
// because some executors will read iterInTbl in Open() (like IndexLookupJoin).
if err = e.recursiveExec.Close(); err != nil {
return err
return
}
if err = e.recursiveExec.Open(ctx); err != nil {
return err
return
}
} else {
if err = e.iterOutTbl.Add(chk); err != nil {
return err
return
}
}
}
return nil
return
}

// Get next chunk from resTbl for limit.
Expand Down
21 changes: 21 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,24 @@ func TestCTEsInView(t *testing.T) {
tk.MustExec("use test1;")
tk.MustQuery("select * from test.v;").Check(testkit.Rows("1"))
}

func TestCTEPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("create table t1(c1 int)")
tk.MustExec("insert into t1 values(1), (2), (3)")

fpPathPrefix := "github.com/pingcap/tidb/executor/"
fp := "testCTESeedPanic"
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))

fp = "testCTERecursivePanic"
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}
23 changes: 6 additions & 17 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ func (s *StorageRC) DerefAndClose() (err error) {
if s.refCnt < 0 {
return errors.New("Storage ref count is less than zero")
} else if s.refCnt == 0 {
// TODO: unreg memtracker
s.refCnt = -1
s.done = false
s.err = nil
s.iter = 0
if err = s.rc.Close(); err != nil {
return err
}
if err = s.resetAll(); err != nil {
return err
}
s.rc = nil
}
return nil
}
Expand All @@ -155,7 +156,7 @@ func (s *StorageRC) SwapData(other Storage) (err error) {

// Reopen impls Storage Reopen interface.
func (s *StorageRC) Reopen() (err error) {
if err = s.rc.Reset(); err != nil {
if err = s.rc.Close(); err != nil {
return err
}
s.iter = 0
Expand Down Expand Up @@ -265,18 +266,6 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
return s.rc.ActionSpillForTest()
}

func (s *StorageRC) resetAll() error {
s.refCnt = -1
s.done = false
s.err = nil
s.iter = 0
if err := s.rc.Reset(); err != nil {
return err
}
s.rc = nil
return nil
}

func (s *StorageRC) valid() bool {
return s.refCnt > 0 && s.rc != nil
}