From d606b0d55fdfe1997857ea7b4bb4d85e6a0f6b09 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Thu, 25 Oct 2018 11:38:35 +0800 Subject: [PATCH] executor: fix wrong result when index join on union scan. (#8031) Do not modify Plan of dataReaderBuilder directly, because it would impact next batch of outer rows, as well as other concurrent inner workers. Instead, build a local child builder to store the child plan. --- executor/builder.go | 47 ++++++++++++++++++++---------- executor/index_lookup_join_test.go | 26 +++++++++++++++++ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index f2d6dda2f1113..fed4e0771654f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -679,10 +679,19 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E b.err = errors.Trace(b.err) return nil } - return b.buildUnionScanFromReader(reader, v) + us, err := b.buildUnionScanFromReader(reader, v) + if err != nil { + b.err = err + return nil + } + return us } -func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor { +// buildUnionScanFromReader builds union scan executor from child executor. +// Note that this function may be called by inner workers of index lookup join concurrently. +// Be careful to avoid data race. +func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) (Executor, error) { + var err error us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)} // Get the handle column index of the below plannercore. // We can guarantee that there must be only one col in the map. @@ -692,10 +701,16 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco switch x := reader.(type) { case *TableReaderExecutor: us.desc = x.desc + // Union scan can only be in a write transaction, so DirtyDB should has non-nil value now, thus + // GetDirtyDB() is safe here. If this table has been modified in the transaction, non-nil DirtyTable + // can be found in DirtyDB now, so GetDirtyTable is safe; if this table has not been modified in the + // transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple + // goroutines write empty DirtyTable to DirtyDB for this table concurrently. Thus we don't use lock + // to synchronize here. us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() case *IndexReaderExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -709,7 +724,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() case *IndexLookUpExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -723,16 +738,16 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. - return reader + return reader, nil } - if b.err != nil { - b.err = errors.Trace(b.err) - return nil + if err != nil { + err = errors.Trace(err) + return nil, err } - return us + return us, nil } // buildMergeJoin builds MergeJoinExec executor. @@ -1830,14 +1845,14 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { - builder.Plan = v.Children()[0] - reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + childBuilder := &dataReaderBuilder{v.Children()[0], builder.executorBuilder} + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) if err != nil { - return nil, errors.Trace(err) + return nil, err } - e := builder.buildUnionScanFromReader(reader, v) - if e == nil { - return nil, builder.err + e, err := builder.buildUnionScanFromReader(reader, v) + if err != nil { + return nil, err } us := e.(*UnionScanExec) us.snapshotChunkBuffer = us.newFirstChunk() diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index 203c6aa79b103..319e6e60e56c8 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -91,3 +91,29 @@ func (s *testSuite) TestIndexJoinUnionScan(c *C) { )) tk.MustExec("rollback") } + +func (s *testSuite) TestBatchIndexJoinUnionScan(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t1(id int primary key, a int)") + tk.MustExec("create table t2(id int primary key, a int, key idx_a(a))") + tk.MustExec("set @@session.tidb_max_chunk_size=1") + tk.MustExec("set @@session.tidb_index_join_batch_size=1") + tk.MustExec("set @@session.tidb_index_lookup_join_concurrency=4") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(1,1),(2,1),(3,1),(4,1)") + tk.MustExec("insert into t2 values(1,1)") + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "StreamAgg_13 1.00 root funcs:count(1)", + "└─IndexJoin_24 12500.00 root inner join, inner:UnionScan_23, outer key:test.t1.a, inner key:test.t2.a", + " ├─UnionScan_25 10000.00 root ", + " │ └─TableReader_27 10000.00 root data:TableScan_26", + " │ └─TableScan_26 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + " └─UnionScan_23 10.00 root ", + " └─IndexReader_22 10.00 root index:IndexScan_21", + " └─IndexScan_21 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "4", + )) + tk.MustExec("rollback") +}