Skip to content

Commit

Permalink
util, executor: use UnionRanges build index kv range for INLJ… (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Apr 15, 2020
1 parent 9f1d049 commit ba735e9
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 45 deletions.
64 changes: 26 additions & 38 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,65 +2184,53 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]kv.KeyRange, error) {
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (_ []kv.KeyRange, err error) {
kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
sc := ctx.GetSessionVars().StmtCtx
tmpDatumRanges := make([]*ranger.Range, 0, len(lookUpContents))
for _, content := range lookUpContents {
for _, ran := range ranges {
for keyOff, idxOff := range keyOff2IdxOff {
ran.LowVal[idxOff] = content.keys[keyOff]
ran.HighVal[idxOff] = content.keys[keyOff]
}
}
if cwc != nil {
nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row)
if cwc == nil {
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, err
}
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.HighVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
}
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, tmpKvRanges...)
}
kvRanges = append(kvRanges, tmpKvRanges...)
continue
}

tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row)
if err != nil {
return nil, err
}
kvRanges = append(kvRanges, tmpKvRanges...)
}
// Sort and merge the overlapped ranges.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
if cwc != nil {
// If cwc is not nil, we need to merge the overlapped ranges here.
mergedKeyRanges := make([]kv.KeyRange, 0, len(kvRanges))
for i := range kvRanges {
if len(mergedKeyRanges) == 0 {
mergedKeyRanges = append(mergedKeyRanges, kvRanges[i])
continue
}
if bytes.Compare(kvRanges[i].StartKey, mergedKeyRanges[len(mergedKeyRanges)-1].EndKey) <= 0 {
mergedKeyRanges[len(mergedKeyRanges)-1].EndKey = kvRanges[i].EndKey
} else {
mergedKeyRanges = append(mergedKeyRanges, kvRanges[i])
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.HighVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
tmpDatumRanges = append(tmpDatumRanges, ran.Clone())
}
}
return mergedKeyRanges, nil
}
return kvRanges, nil

if cwc == nil {
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return kvRanges, nil
}

tmpDatumRanges, err = ranger.UnionRanges(ctx.GetSessionVars().StmtCtx, tmpDatumRanges)
if err != nil {
return nil, err
}
return distsql.IndexRangesToKVRanges(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil)
}

func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec {
Expand Down
13 changes: 13 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,19 @@ func (s *testSuite2) TestIndexLookupJoin(c *C) {
tk.MustQuery("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t2.b=t1.id and t2.a=t1.id;").Check(testkit.Rows("1 1 1"))
}

func (s *testSuite2) TestIssue15686(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, k;")
tk.MustExec("create table k (a int, pk int primary key, index(a));")
tk.MustExec("create table t (a int, pk int primary key, index(a));")
tk.MustExec("insert into k values(0,8),(0,23),(1,21),(1,33),(1,52),(2,17),(2,34),(2,39),(2,40),(2,66),(2,67),(3,9),(3,25),(3,41),(3,48),(4,4),(4,11),(4,15),(4,26),(4,27),(4,31),(4,35),(4,45),(4,47),(4,49);")
tk.MustExec("insert into t values(3,4),(3,5),(3,27),(3,29),(3,57),(3,58),(3,79),(3,84),(3,92),(3,95);")
tk.MustQuery("select /*+ inl_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
tk.MustQuery("select /*+ inl_hash_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
tk.MustQuery("select /*+ inl_merge_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
}

func (s *testSuite2) TestMergejoinOrder(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion util/ranger/detacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func detachDNFCondAndBuildRangeForIndex(sctx sessionctx.Context, condition *expr
}
}

totalRanges, err := unionRanges(sc, totalRanges)
totalRanges, err := UnionRanges(sc, totalRanges)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
Expand Down
13 changes: 7 additions & 6 deletions util/ranger/ranger.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func buildColumnRange(accessConditions []expression.Expression, sc *stmtctx.Stat
ran.HighExclude = false
}
}
ranges, err = unionRanges(sc, ranges)
ranges, err = UnionRanges(sc, ranges)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func buildCNFIndexRange(sc *stmtctx.StatementContext, cols []*expression.Column,
// Take prefix index into consideration.
if hasPrefix(lengths) {
if fixPrefixColRange(ranges, lengths, newTp) {
ranges, err = unionRanges(sc, ranges)
ranges, err = UnionRanges(sc, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -356,7 +356,11 @@ type sortRange struct {
encodedEnd []byte
}

func unionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error) {
// UnionRanges sorts `ranges`, union adjacent ones if possible.
// For two intervals [a, b], [c, d], we have guaranteed that a <= c. If b >= c. Then two intervals are overlapped.
// And this two can be merged as [a, max(b, d)].
// Otherwise they aren't overlapped.
func UnionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error) {
if len(ranges) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -384,9 +388,6 @@ func unionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error
ranges = ranges[:0]
lastRange := objects[0]
for i := 1; i < len(objects); i++ {
// For two intervals [a, b], [c, d], we have guaranteed that a >= c. If b >= c. Then two intervals are overlapped.
// And this two can be merged as [a, max(b, d)].
// Otherwise they aren't overlapped.
if bytes.Compare(lastRange.encodedEnd, objects[i].encodedStart) >= 0 {
if bytes.Compare(lastRange.encodedEnd, objects[i].encodedEnd) < 0 {
lastRange.encodedEnd = objects[i].encodedEnd
Expand Down

0 comments on commit ba735e9

Please sign in to comment.