Skip to content

Commit

Permalink
executor: fix hang when the analyze worker unexpectedly exits (#48365)
Browse files Browse the repository at this point in the history
close #48356
  • Loading branch information
hawkingrei authored Nov 8, 2023
1 parent 632cd84 commit 92f071a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 19 deletions.
13 changes: 10 additions & 3 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
g, _ := errgroup.WithContext(ctx)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})
Expand All @@ -134,9 +134,15 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})

TASKLOOP:
for _, task := range tasks {
taskCh <- task
select {
case taskCh <- task:
case <-e.errExitCh:
break TASKLOOP
case <-gctx.Done():
break TASKLOOP
}
}
close(taskCh)
defer func() {
Expand Down Expand Up @@ -525,6 +531,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
if !ok {
break
}
failpoint.Inject("handleAnalyzeWorkerPanic", nil)
StartAnalyzeJob(e.Ctx(), task.job)
switch task.taskType {
case colTask:
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 49,
shard_count = 48,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
15 changes: 0 additions & 15 deletions pkg/executor/test/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3118,18 +3118,3 @@ func TestAnalyzePartitionVerify(t *testing.T) {
}
}
}

func TestPanicInHandleResultErrorWithSingleGoroutine(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table tbl_2 ( col_20 decimal default 84232 , col_21 tinyint not null , col_22 int default 80814394 , col_23 mediumint default -8036687 not null , col_24 smallint default 9185 not null , col_25 tinyint unsigned default 65 , col_26 char(115) default 'ZyfroRODMbNDRZnPNRW' not null , col_27 bigint not null , col_28 tinyint not null , col_29 char(130) default 'UMApsVgzHblmY' , primary key idx_14 ( col_28,col_22 ) , unique key idx_15 ( col_24,col_22 ) , key idx_16 ( col_21,col_20,col_24,col_25,col_27,col_28,col_26,col_29 ) , key idx_17 ( col_24,col_25 ) , unique key idx_18 ( col_25,col_23,col_29,col_27,col_26,col_22 ) , key idx_19 ( col_25,col_22,col_26,col_23 ) , unique key idx_20 ( col_22,col_24,col_28,col_29,col_26,col_20 ) , key idx_21 ( col_25,col_24,col_26,col_29,col_27,col_22,col_28 ) );")
tk.MustExec("insert ignore into tbl_2 values ( 942,33,-1915007317,3408149,-3699,193,'Trywdis',1876334369465184864,115,null );")
fp := "github.com/pingcap/tidb/pkg/executor/handleResultsErrorSingleThreadPanic"
require.NoError(t, failpoint.Enable(fp, `panic("TestPanicInHandleResultErrorWithSingleGoroutine")`))
defer func() {
require.NoError(t, failpoint.Disable(fp))
}()
tk.MustExecToErr("analyze table tbl_2;")
}
18 changes: 18 additions & 0 deletions pkg/executor/test/analyzetest/panictest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "panictest_test",
timeout = "short",
srcs = [
"main_test.go",
"panic_test.go",
],
flaky = True,
deps = [
"//pkg/config",
"//pkg/testkit",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
34 changes: 34 additions & 0 deletions pkg/executor/test/analyzetest/panictest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package panictest

import (
"testing"

"github.com/pingcap/tidb/pkg/config"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
61 changes: 61 additions & 0 deletions pkg/executor/test/analyzetest/panictest/panic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package panictest

import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestPanicInHandleResultErrorWithSingleGoroutine(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table tbl_2 ( col_20 decimal default 84232 , col_21 tinyint not null , col_22 int default 80814394 , col_23 mediumint default -8036687 not null , col_24 smallint default 9185 not null , col_25 tinyint unsigned default 65 , col_26 char(115) default 'ZyfroRODMbNDRZnPNRW' not null , col_27 bigint not null , col_28 tinyint not null , col_29 char(130) default 'UMApsVgzHblmY' , primary key idx_14 ( col_28,col_22 ) , unique key idx_15 ( col_24,col_22 ) , key idx_16 ( col_21,col_20,col_24,col_25,col_27,col_28,col_26,col_29 ) , key idx_17 ( col_24,col_25 ) , unique key idx_18 ( col_25,col_23,col_29,col_27,col_26,col_22 ) , key idx_19 ( col_25,col_22,col_26,col_23 ) , unique key idx_20 ( col_22,col_24,col_28,col_29,col_26,col_20 ) , key idx_21 ( col_25,col_24,col_26,col_29,col_27,col_22,col_28 ) );")
tk.MustExec("insert ignore into tbl_2 values ( 942,33,-1915007317,3408149,-3699,193,'Trywdis',1876334369465184864,115,null );")
fp := "github.com/pingcap/tidb/pkg/executor/handleResultsErrorSingleThreadPanic"
require.NoError(t, failpoint.Enable(fp, `panic("TestPanicInHandleResultErrorWithSingleGoroutine")`))
defer func() {
require.NoError(t, failpoint.Disable(fp))
}()
tk.MustExecToErr("analyze table tbl_2;")
}

func TestPanicInHandleAnalyzeWorkerPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table tbl_2 ( col_20 decimal default 84232 , col_21 tinyint not null , col_22 int default 80814394 , col_23 mediumint default -8036687 not null , col_24 smallint default 9185 not null , col_25 tinyint unsigned default 65 , col_26 char(115) default 'ZyfroRODMbNDRZnPNRW' not null , col_27 bigint not null , col_28 tinyint not null , col_29 char(130) default 'UMApsVgzHblmY' , primary key idx_14 ( col_28,col_22 ) , unique key idx_15 ( col_24,col_22 ) , key idx_16 ( col_21,col_20,col_24,col_25,col_27,col_28,col_26,col_29 ) , key idx_17 ( col_24,col_25 ) , unique key idx_18 ( col_25,col_23,col_29,col_27,col_26,col_22 ) , key idx_19 ( col_25,col_22,col_26,col_23 ) , unique key idx_20 ( col_22,col_24,col_28,col_29,col_26,col_20 ) , key idx_21 ( col_25,col_24,col_26,col_29,col_27,col_22,col_28 ) ) partition by range ( col_22 ) ( partition p0 values less than (-1938341588), partition p1 values less than (-1727506184), partition p2 values less than (-1700184882), partition p3 values less than (-1596142809), partition p4 values less than (445165686) );")
tk.MustExec("insert ignore into tbl_2 values ( 942,33,-1915007317,3408149,-3699,193,'Trywdis',1876334369465184864,115,null );")
tk.MustExec("insert ignore into tbl_2 values ( null,55,-388460319,-2292918,10130,162,'UqjDlYvdcNY',4872802276956896607,-51,'ORBQjnumcXP' );")
tk.MustExec("insert ignore into tbl_2 values ( 42,-19,-9677826,-1168338,16904,79,'TzOqH',8173610791128879419,65,'lNLcvOZDcRzWvDO' );")
tk.MustExec("insert ignore into tbl_2 values ( 2,26,369867543,-6773303,-24953,41,'BvbdrKTNtvBgsjjnxt',5996954963897924308,-95,'wRJYPBahkIGDfz' );")
tk.MustExec("insert ignore into tbl_2 values ( 6896,3,444460824,-2070971,-13095,167,'MvWNKbaOcnVuIrtbT',6968339995987739471,-5,'zWipNBxGeVmso' );")
tk.MustExec("insert ignore into tbl_2 values ( 58761,112,-1535034546,-5837390,-14204,157,'',-8319786912755096816,15,'WBjsozfBfrPPHmKv' );")
tk.MustExec("insert ignore into tbl_2 values ( 84923,113,-973946646,406140,25040,51,'THQdwkQvppWZnULm',5469507709881346105,94,'oGNmoxLLgHkdyDCT' );")
tk.MustExec("insert ignore into tbl_2 values ( 0,-104,-488745187,-1941015,-2646,39,'jyKxfs',-5307175470406648836,46,'KZpfjFounVgFeRPa' );")
tk.MustExec("insert ignore into tbl_2 values ( 4,97,2105289255,1034363,28385,192,'',4429378142102752351,8,'jOk' );")
fp := "github.com/pingcap/tidb/pkg/executor/handleAnalyzeWorkerPanic"
require.NoError(t, failpoint.Enable(fp, `panic("TestPanicInHandleAnalyzeWorkerPanic")`))
defer func() {
require.NoError(t, failpoint.Disable(fp))
}()
tk.MustExecToErr("analyze table tbl_2;")
}

0 comments on commit 92f071a

Please sign in to comment.