Skip to content

Commit

Permalink
executor, planner: refine merge join compare methods to avoid some ra…
Browse files Browse the repository at this point in the history
…re cases (#9390) (#9438)
  • Loading branch information
zz-jason authored Feb 25, 2019
1 parent 528fe14 commit 693c9ed
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 113 deletions.
13 changes: 10 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,16 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
e := &MergeJoinExec{
stmtCtx: b.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
joiner: newJoiner(b.ctx, v.JoinType, v.JoinType == plannercore.RightOuterJoin,
defaultValues, v.OtherConditions,
leftExec.retTypes(), rightExec.retTypes()),
compareFuncs: v.CompareFuncs,
joiner: newJoiner(
b.ctx,
v.JoinType,
v.JoinType == plannercore.RightOuterJoin,
defaultValues,
v.OtherConditions,
leftExec.retTypes(),
rightExec.retTypes(),
),
}

leftKeys := v.LeftKeys
Expand Down
32 changes: 25 additions & 7 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MergeJoinExec struct {
baseExecutor

stmtCtx *stmtctx.StatementContext
compareFuncs []chunk.CompareFunc
compareFuncs []expression.CompareFunc
joiner joiner

prepared bool
Expand Down Expand Up @@ -75,7 +75,7 @@ type mergeJoinInnerTable struct {

// for chunk executions
sameKeyRows []chunk.Row
compareFuncs []chunk.CompareFunc
keyCmpFuncs []chunk.CompareFunc
firstRow4Key chunk.Row
curRow chunk.Row
curResult *chunk.Chunk
Expand All @@ -99,9 +99,9 @@ func (t *mergeJoinInnerTable) init(ctx context.Context, chk4Reader *chunk.Chunk)
t.resultQueue = append(t.resultQueue, chk4Reader)
t.memTracker.Consume(chk4Reader.MemoryUsage())
t.firstRow4Key, err = t.nextRow()
t.compareFuncs = make([]chunk.CompareFunc, 0, len(t.joinKeys))
t.keyCmpFuncs = make([]chunk.CompareFunc, 0, len(t.joinKeys))
for i := range t.joinKeys {
t.compareFuncs = append(t.compareFuncs, chunk.GetCompareFunc(t.joinKeys[i].RetType))
t.keyCmpFuncs = append(t.keyCmpFuncs, chunk.GetCompareFunc(t.joinKeys[i].RetType))
}
return errors.Trace(err)
}
Expand All @@ -123,7 +123,7 @@ func (t *mergeJoinInnerTable) rowsWithSameKey() ([]chunk.Row, error) {
t.firstRow4Key = t.curIter.End()
return t.sameKeyRows, errors.Trace(err)
}
compareResult := compareChunkRow(t.compareFuncs, selectedRow, t.firstRow4Key, t.joinKeys, t.joinKeys)
compareResult := compareChunkRow(t.keyCmpFuncs, selectedRow, t.firstRow4Key, t.joinKeys, t.joinKeys)
if compareResult == 0 {
t.sameKeyRows = append(t.sameKeyRows, selectedRow)
} else {
Expand Down Expand Up @@ -256,7 +256,6 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error {
return errors.Trace(err)
}

e.compareFuncs = e.innerTable.compareFuncs
e.prepared = true
return nil
}
Expand Down Expand Up @@ -294,7 +293,10 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM

cmpResult := -1
if e.outerTable.selected[e.outerTable.row.Idx()] && len(e.innerRows) > 0 {
cmpResult = compareChunkRow(e.compareFuncs, e.outerTable.row, e.innerRows[0], e.outerTable.keys, e.innerTable.joinKeys)
cmpResult, err = e.compare(e.outerTable.row, e.innerIter4Row.Current())
if err != nil {
return false, err
}
}

if cmpResult > 0 {
Expand Down Expand Up @@ -340,6 +342,22 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM
}
}

func (e *MergeJoinExec) compare(outerRow, innerRow chunk.Row) (int, error) {
outerJoinKeys := e.outerTable.keys
innerJoinKeys := e.innerTable.joinKeys
for i := range outerJoinKeys {
cmp, _, err := e.compareFuncs[i](e.ctx, outerJoinKeys[i], innerJoinKeys[i], outerRow, innerRow)
if err != nil {
return 0, err
}

if cmp != 0 {
return int(cmp), nil
}
}
return 0, nil
}

// fetchNextInnerRows fetches the next join group, within which all the rows
// have the same join key, from the inner table.
func (e *MergeJoinExec) fetchNextInnerRows() (err error) {
Expand Down
33 changes: 33 additions & 0 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,36 @@ func (s *testSuite) Test3WaysMergeJoin(c *C) {
result = checkPlanAndRun(tk, c, plan3, "select /*+ TIDB_SMJ(t1,t2,t3) */ * from t1 right outer join t2 on t1.c1 = t2.c1 join t3 on t1.c1 = t3.c1 order by 1")
result.Check(testkit.Rows("2 2 2 3 2 4", "3 3 3 4 3 10"))
}

func (s *testSuite) TestMergeJoinDifferentTypes(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`drop table if exists t1;`)
tk.MustExec(`drop table if exists t2;`)
tk.MustExec(`create table t1(a bigint, b bit(1), index idx_a(a));`)
tk.MustExec(`create table t2(a bit(1) not null, b bit(1), index idx_a(a));`)
tk.MustExec(`insert into t1 values(1, 1);`)
tk.MustExec(`insert into t2 values(1, 1);`)
tk.MustQuery(`select hex(t1.a), hex(t2.a) from t1 inner join t2 on t1.a=t2.a;`).Check(testkit.Rows(`1 1`))

tk.MustExec(`drop table if exists t1;`)
tk.MustExec(`drop table if exists t2;`)
tk.MustExec(`create table t1(a float, b double, index idx_a(a));`)
tk.MustExec(`create table t2(a double not null, b double, index idx_a(a));`)
tk.MustExec(`insert into t1 values(1, 1);`)
tk.MustExec(`insert into t2 values(1, 1);`)
tk.MustQuery(`select t1.a, t2.a from t1 inner join t2 on t1.a=t2.a;`).Check(testkit.Rows(`1 1`))

tk.MustExec(`drop table if exists t1;`)
tk.MustExec(`drop table if exists t2;`)
tk.MustExec(`create table t1(a bigint signed, b bigint, index idx_a(a));`)
tk.MustExec(`create table t2(a bigint unsigned, b bigint, index idx_a(a));`)
tk.MustExec(`insert into t1 values(-1, 0), (-1, 0), (0, 0), (0, 0), (pow(2, 63), 0), (pow(2, 63), 0);`)
tk.MustExec(`insert into t2 values(18446744073709551615, 0), (18446744073709551615, 0), (0, 0), (0, 0), (pow(2, 63), 0), (pow(2, 63), 0);`)
tk.MustQuery(`select t1.a, t2.a from t1 join t2 on t1.a=t2.a order by t1.a;`).Check(testkit.Rows(
`0 0`,
`0 0`,
`0 0`,
`0 0`,
))
}
Loading

0 comments on commit 693c9ed

Please sign in to comment.