diff --git a/executor/join.go b/executor/join.go index 03ea7fb5b9f46..ec3456a393a35 100644 --- a/executor/join.go +++ b/executor/join.go @@ -576,6 +576,16 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx } for i := range selected { + killed := atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 + failpoint.Inject("killedInJoin2Chunk", func(val failpoint.Value) { + if val.(bool) { + killed = true + } + }) + if killed { + joinResult.err = ErrQueryInterrupted + return false, joinResult + } if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) } else { // process matched probe side rows @@ -607,6 +617,16 @@ func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *c } } for i := 0; i < probeSideChk.NumRows(); i++ { + killed := atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 + failpoint.Inject("killedInJoin2ChunkForOuterHashJoin", func(val failpoint.Value) { + if val.(bool) { + killed = true + } + }) + if killed { + joinResult.err = ErrQueryInterrupted + return false, joinResult + } probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i) ok, joinResult = e.joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID, probeKey, probeRow, hCtx, joinResult) if !ok { diff --git a/executor/join_test.go b/executor/join_test.go index 84a920a3ec3cb..3dcb125a88a34 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -23,6 +23,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/executor" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util" @@ -2309,3 +2310,26 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { c.Assert(rows[0][0], Matches, "HashJoin.*") c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}") } + +func (s *testSuiteJoinSerial) TestIssue20270(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + failpoint.Enable("github.com/pingcap/tidb/executor/killedInJoin2Chunk", "return(true)") + tk.MustExec("drop table if exists t;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t(c1 int, c2 int)") + tk.MustExec("create table t1(c1 int, c2 int)") + tk.MustExec("insert into t values(1,1),(2,2)") + tk.MustExec("insert into t1 values(2,3),(4,4)") + err := tk.QueryToErr("select /*+ TIDB_HJ(t, t1) */ * from t left join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20") + c.Assert(err, Equals, executor.ErrQueryInterrupted) + failpoint.Disable("github.com/pingcap/tidb/executor/killedInJoin2Chunk") + plannercore.ForceUseOuterBuild4Test = true + defer func() { + plannercore.ForceUseOuterBuild4Test = false + }() + failpoint.Enable("github.com/pingcap/tidb/executor/killedInJoin2ChunkForOuterHashJoin", "return(true)") + tk.MustExec("insert into t1 values(1,30),(2,40)") + err = tk.QueryToErr("select /*+ TIDB_HJ(t, t1) */ * from t left outer join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20") + c.Assert(err, Equals, executor.ErrQueryInterrupted) + failpoint.Disable("github.com/pingcap/tidb/executor/killedInJoin2ChunkForOuterHashJoin") +}