Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: optimize (left outer) (anti) semi join which has no other condition #47764

Merged
merged 7 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,14 @@ func (p *cteProducer) checkHasDup(probeKey uint64,
curChk *chunk.Chunk,
storage cteutil.Storage,
hashTbl baseHashTable) (hasDup bool, err error) {
ptrs := hashTbl.Get(probeKey)
entry := hashTbl.Get(probeKey)

if len(ptrs) == 0 {
if entry == nil {
return false, nil
}

for _, ptr := range ptrs {
for ; entry != nil; entry = entry.next {
ptr := entry.ptr
var matchedRow chunk.Row
if curChk != nil {
matchedRow = curChk.GetRow(int(ptr.RowIdx))
Expand Down
66 changes: 52 additions & 14 deletions pkg/executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,47 @@ func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, h
return matchedRows, err
}

// GetOneMatchedRow get one matched rows from probeRow.
func (c *hashRowContainer) GetOneMatchedRow(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (*chunk.Row, error) {
var err error
innerEntry := c.hashTable.Get(probeKey)
if innerEntry == nil {
return nil, err
}
var matchedRow chunk.Row

if c.chkBuf != nil {
c.chkBuf.Reset()
}
capacity := 0

for i := 0; innerEntry != nil; i, innerEntry = i+1, innerEntry.next {
ptr := innerEntry.ptr
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return nil, err
}
if ok {
return &matchedRow, nil
}
atomic.AddInt64(&c.stat.probeCollision, 1)
if i == 0 {
capacity = c.chkBuf.Capacity()
if capacity < 128 {
capacity = 128
}
} else if (i+1)%capacity == 0 {
c.chkBuf.Reset()
}
}
return nil, err
}

func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildColPos, needCheckProbeColPos []int, needCheckBuildTypes, needCheckProbeTypes []*types.FieldType) ([]chunk.Row, error) {
// for NAAJ probe row with null, we should match them with all build rows.
Expand Down Expand Up @@ -232,7 +273,12 @@ const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr, needPtr bool) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
innerPtrs := c.hashTable.Get(probeKey)
innerEntry := c.hashTable.Get(probeKey)
var innerPtrs []chunk.RowPtr
for innerEntry != nil {
innerPtrs = append(innerPtrs, innerEntry.ptr)
innerEntry = innerEntry.next
}
if len(innerPtrs) == 0 {
return nil, nil, err
}
Expand Down Expand Up @@ -565,7 +611,7 @@ func (es *entryStore) GetStore() (e *entry, memDelta int64) {

type baseHashTable interface {
Put(hashKey uint64, rowPtr chunk.RowPtr)
Get(hashKey uint64) (rowPtrs []chunk.RowPtr)
Get(hashKey uint64) *entry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to add a comment example here for how to iterator the stored ptrs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Nice suggestion.

Len() uint64
// GetAndCleanMemoryDelta gets and cleans the memDelta of the baseHashTable. Memory delta will be cleared after each fetch.
// It indicates the memory delta of the baseHashTable since the last calling GetAndCleanMemoryDelta().
Expand Down Expand Up @@ -611,13 +657,9 @@ func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
}

// Get gets the values of the "key" and appends them to "values".
func (ht *unsafeHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) {
func (ht *unsafeHashTable) Get(hashKey uint64) *entry {
entryAddr := ht.hashMap[hashKey]
for entryAddr != nil {
rowPtrs = append(rowPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
return
return entryAddr
}

// Len returns the number of rowPtrs in the unsafeHashTable, the number of keys may be less than Len
Expand Down Expand Up @@ -674,13 +716,9 @@ func (ht *concurrentMapHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
}

// Get gets the values of the "key" and appends them to "values".
func (ht *concurrentMapHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) {
func (ht *concurrentMapHashTable) Get(hashKey uint64) *entry {
entryAddr, _ := ht.hashMap.Get(hashKey)
for entryAddr != nil {
rowPtrs = append(rowPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
return
return entryAddr
}

// Iter gets the every value of the hash table.
Expand Down
11 changes: 4 additions & 7 deletions pkg/executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ type indexHashJoinOuterWorker struct {

type indexHashJoinInnerWorker struct {
innerWorker
matchedOuterPtrs []chunk.RowPtr
joiner joiner
joinChkResourceCh chan *chunk.Chunk
// resultCh is valid only when indexNestedLoopHashJoin do not need to keep
Expand Down Expand Up @@ -436,7 +435,6 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
joiner: e.joiners[workerID],
joinChkResourceCh: e.joinChkResourceCh[workerID],
resultCh: e.resultCh,
matchedOuterPtrs: make([]chunk.RowPtr, 0, e.MaxChunkSize()),
joinKeyBuf: make([]byte, 1),
outerRowStatus: make([]outerRowStatusFlag, 0, e.MaxChunkSize()),
rowIter: chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice),
Expand Down Expand Up @@ -712,15 +710,14 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task
if err != nil {
return nil, nil, err
}
iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64())
if len(iw.matchedOuterPtrs) == 0 {
matchedOuterEntry := task.lookupMap.Get(h.Sum64())
if matchedOuterEntry == nil {
return nil, nil, nil
}
joinType := JoinerType(iw.joiner)
isSemiJoin := joinType == plannercore.SemiJoin || joinType == plannercore.LeftOuterSemiJoin
matchedRows = make([]chunk.Row, 0, len(iw.matchedOuterPtrs))
matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs))
for _, ptr := range iw.matchedOuterPtrs {
for ; matchedOuterEntry != nil; matchedOuterEntry = matchedOuterEntry.next {
ptr := matchedOuterEntry.ptr
outerRow := task.outerResult.GetRow(ptr)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,18 @@ func (w *probeWorker) joinNAAJMatchProbeSideRow2Chunk(probeKey uint64, probeKeyN
func (w *probeWorker) joinMatchedProbeSideRow2Chunk(probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
var err error
w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows)
buildSideRows := w.buildSideRows
var buildSideRows []chunk.Row
if w.joiner.isSemiJoinWithoutCondition() {
var rowPtr *chunk.Row
rowPtr, err = w.rowContainerForProbe.GetOneMatchedRow(probeKey, probeSideRow, hCtx)
if rowPtr != nil {
buildSideRows = append(buildSideRows, *rowPtr)
}
} else {
w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows)
buildSideRows = w.buildSideRows
}

if err != nil {
joinResult.err = err
return false, joinResult
Expand Down
43 changes: 42 additions & 1 deletion pkg/executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type joiner interface {
// parameter passed to `onMissMatch`.
onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk)

// isSemiJoinWithoutCondition returns if it's a semi join and has no condition.
// If true, at most one matched row is needed to match inners, which can optimize a lot when
// there are a lot of matched rows.
isSemiJoinWithoutCondition() bool

// Clone deep copies a joiner.
Clone() joiner
}
Expand Down Expand Up @@ -426,6 +431,10 @@ func (j *semiJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, ch

func (*semiJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {}

func (j *semiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

// Clone implements joiner interface.
func (j *semiJoiner) Clone() joiner {
return &semiJoiner{baseJoiner: j.baseJoiner.Clone()}
Expand Down Expand Up @@ -490,6 +499,10 @@ func (naaj *nullAwareAntiSemiJoiner) onMissMatch(_ bool, outer chunk.Row, chk *c
chk.AppendRowByColIdxs(outer, naaj.lUsed)
}

func (naaj *nullAwareAntiSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(naaj.conditions) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this optimazation works for nullaware join? And since only joinMatchedProbeSideRow2Chunk is check isSemiJoinWithoutCondition, nullaware join is not optimized even if it return true here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This function is useless for null-aware semi join.

}

func (naaj *nullAwareAntiSemiJoiner) Clone() joiner {
return &nullAwareAntiSemiJoiner{baseJoiner: naaj.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -559,6 +572,10 @@ func (j *antiSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.C
}
}

func (j *antiSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *antiSemiJoiner) Clone() joiner {
return &antiSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -641,6 +658,10 @@ func (j *leftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *ch
}
}

func (j *leftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *leftOuterSemiJoiner) Clone() joiner {
return &leftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -713,8 +734,12 @@ func (*nullAwareAntiLeftOuterSemiJoiner) tryToMatchOuters(chunk.Iterator, chunk.
return nil, err
}

func (naal *nullAwareAntiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(naal.conditions) == 0
}

func (naal *nullAwareAntiLeftOuterSemiJoiner) Clone() joiner {
return &antiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()}
return &nullAwareAntiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()}
}

type antiLeftOuterSemiJoiner struct {
Expand Down Expand Up @@ -798,6 +823,10 @@ func (j *antiLeftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk
}
}

func (j *antiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *antiLeftOuterSemiJoiner) Clone() joiner {
return &antiLeftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -877,6 +906,10 @@ func (j *leftOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk)
chk.AppendPartialRowByColIdxs(lWide, j.defaultInner, j.rUsed)
}

func (*leftOuterJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *leftOuterJoiner) Clone() joiner {
return &leftOuterJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -952,6 +985,10 @@ func (j *rightOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk
chk.AppendPartialRowByColIdxs(lWide, outer, j.rUsed)
}

func (*rightOuterJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *rightOuterJoiner) Clone() joiner {
return &rightOuterJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -1035,6 +1072,10 @@ func (j *innerJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, c

func (*innerJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {}

func (*innerJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *innerJoiner) Clone() joiner {
return &innerJoiner{baseJoiner: j.baseJoiner.Clone()}
}