Skip to content

Commit

Permalink
executor: fix tidb crash when shuffleExec quit unexpectedly (#48828) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 2, 2024
1 parent 69894ec commit 1851c9e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
23 changes: 23 additions & 0 deletions executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,3 +585,26 @@ func TestGetMvccByEncodedKeyRegionError(t *testing.T) {
require.Equal(t, 1, len(resp.Info.Writes))
require.Equal(t, commitTs, resp.Info.Writes[0].CommitTs)
}

func TestShuffleExit(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(i int, j int, k int);")
tk.MustExec("insert into t1 VALUES (1,1,1),(2,2,2),(3,3,3),(4,4,4);")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/shuffleError", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/shuffleError"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/shuffleExecFetchDataAndSplit", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/shuffleExecFetchDataAndSplit"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/shuffleWorkerRun", "panic(\"ShufflePanic\")"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/shuffleWorkerRun"))
}()
err := tk.QueryToErr("SELECT SUM(i) OVER W FROM t1 WINDOW w AS (PARTITION BY j ORDER BY i) ORDER BY 1+SUM(i) OVER w;")
require.ErrorContains(t, err, "ShuffleExec.Next error")
}
20 changes: 15 additions & 5 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (e *ShuffleExec) Open(ctx context.Context) error {

e.prepared = false
e.finishCh = make(chan struct{}, 1)
e.outputCh = make(chan *shuffleOutput, e.concurrency)
e.outputCh = make(chan *shuffleOutput, e.concurrency+len(e.dataSources))

for _, w := range e.workers {
w.finishCh = e.finishCh
Expand Down Expand Up @@ -199,13 +200,13 @@ func (e *ShuffleExec) Close() error {
}

func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(e.workers) + len(e.dataSources))
// create a goroutine for each dataSource to fetch and split data
for i := range e.dataSources {
go e.fetchDataAndSplit(ctx, i)
go e.fetchDataAndSplit(ctx, i, waitGroup)
}

waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(e.workers))
for _, w := range e.workers {
go w.run(ctx, waitGroup)
}
Expand Down Expand Up @@ -256,7 +257,7 @@ func recoveryShuffleExec(output chan *shuffleOutput, r interface{}) {
logutil.BgLogger().Error("shuffle panicked", zap.Error(err), zap.Stack("stack"))
}

func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int) {
func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int, waitGroup *sync.WaitGroup) {
var (
err error
workerIndices []int
Expand All @@ -271,8 +272,16 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int
for _, w := range e.workers {
close(w.receivers[dataSourceIndex].inputCh)
}
waitGroup.Done()
}()

failpoint.Inject("shuffleExecFetchDataAndSplit", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(100 * time.Millisecond)
panic("shuffleExecFetchDataAndSplitPanic")
}
})

for {
err = Next(ctx, e.dataSources[dataSourceIndex], chk)
if err != nil {
Expand Down Expand Up @@ -386,6 +395,7 @@ func (e *shuffleWorker) run(ctx context.Context, waitGroup *sync.WaitGroup) {
waitGroup.Done()
}()

failpoint.Inject("shuffleWorkerRun", nil)
for {
select {
case <-e.finishCh:
Expand Down

0 comments on commit 1851c9e

Please sign in to comment.