Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix panic issue when handle HashAggExec.Close() #46662

Merged
merged 5 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ type HashAggExec struct {
IsChildReturnEmpty bool
// After we support parallel execution for aggregation functions with distinct,
// we can remove this attribute.
IsUnparallelExec bool
parallelExecInitialized bool
prepared bool
executed bool
IsUnparallelExec bool
parallelExecValid bool
prepared bool
executed bool

memTracker *memory.Tracker // track memory usage.
diskTracker *disk.Tracker
Expand Down Expand Up @@ -168,7 +168,7 @@ func (e *HashAggExec) Close() error {
}
return firstErr
}
if e.parallelExecInitialized {
if e.parallelExecValid {
// `Close` may be called after `Open` without calling `Next` in test.
if !e.prepared {
close(e.inputCh)
Expand All @@ -192,6 +192,7 @@ func (e *HashAggExec) Close() error {
if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
}
e.parallelExecValid = false
}
return e.BaseExecutor.Close()
}
Expand Down Expand Up @@ -327,7 +328,7 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
e.finalWorkers[i].finalResultHolderCh <- exec.NewFirstChunk(e)
}

e.parallelExecInitialized = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test case for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

e.parallelExecValid = true
}

// Next implements the Executor Next interface.
Expand Down
32 changes: 32 additions & 0 deletions executor/test/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,3 +1426,35 @@ func TestIssue42662(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/issue42662_1"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/servermemorylimit/issue42662_2"))
}

func TestIssue41778(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE ta (
a1 json DEFAULT NULL,
a2 decimal(31, 1) DEFAULT '0'
);
CREATE TABLE tb (
b1 smallint(6) DEFAULT '-11385',
b2 decimal(63, 14) DEFAULT '-6197127648752447138876497216172307937317445669286.98661563645110'
);
CREATE TABLE tc (
c1 text DEFAULT NULL,
c2 float NOT NULL DEFAULT '1.8132474',
PRIMARY KEY (c2)
/*T![clustered_index] CLUSTERED */
);
`)
tk.MustExec(`
insert into ta
values (NULL, 1228.0);
insert into ta
values ('"json string1"', 623.8);
insert into ta
values (NULL, 1337.0);
`)
err := tk.QueryToErr("select count(*)from ta where not ( ta.a1 in ( select b2 from tb where not ( ta.a1 in ( select c1 from tc where ta.a2 in ( select b2 from tb where IsNull(ta.a1) ) ) ) ) )")
require.Equal(t, "[planner:1815]expression isnull(cast(test.ta.a1, var_string(4294967295))) cannot be pushed down", err.Error())
}