Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: use another way to merge topn #47765

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e0e2588
statistics: use another way to merge topn
winoros Oct 18, 2023
99327a9
Merge branch 'master' into merge-topn-one-pass
winoros Oct 18, 2023
72adddd
fantastic skipping
winoros Oct 18, 2023
0086a8c
use Fix instead of Pop then Push
winoros Oct 18, 2023
abf47b8
fix topn num and enlarge the log's step
winoros Oct 19, 2023
547f9c6
save some cpu for calling NextClear
winoros Oct 19, 2023
b697bbf
change log pos and fix deadloop
winoros Oct 19, 2023
26fd8da
Merge branch 'master' into merge-topn-one-pass
winoros Oct 19, 2023
8fa85f9
Merge branch 'master' into merge-topn-one-pass
winoros Jan 26, 2024
6c01400
fix the bug of codes
winoros Feb 5, 2024
78f35ea
redo the num change to reduce the diff
winoros Feb 5, 2024
6a61462
skip two test
winoros Feb 5, 2024
fd09e6f
fix linting
winoros Feb 5, 2024
1dc2a28
remove dead codes
winoros Feb 5, 2024
06f4d41
Merge branch 'master' into merge-topn-one-pass
winoros Feb 5, 2024
6dcbcc0
Merge remote-tracking branch 'origin/master' into merge-topn-one-pass
winoros Feb 21, 2024
6f0c669
address comments
winoros Feb 21, 2024
8038ee3
use a wrapper
winoros Feb 25, 2024
b819a09
Merge branch 'master' into merge-topn-one-pass
winoros Mar 7, 2024
606519c
Merge branch 'master' into merge-topn-one-pass
winoros Mar 27, 2024
3d4997e
Merge branch 'master' into merge-topn-one-pass
winoros Jan 22, 2025
63a02bb
Merge remote-tracking branch 'origin/master' into merge-topn-one-pass
winoros Jan 22, 2025
78a3d37
Merge remote-tracking branch 'origin/master' into merge-topn-one-pass
winoros Jan 23, 2025
4efdb94
unskip tests
winoros Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix the bug of codes
  • Loading branch information
winoros committed Feb 5, 2024
commit 6c01400f4cd8606f0b3f9429acdb29d9628be2ba
151 changes: 83 additions & 68 deletions pkg/statistics/handle/globalstats/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package globalstats
import (
"bytes"
"container/heap"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,7 +151,11 @@ func (hi *histIter) remove(v *types.Datum) int64 {
cmp = chunk.Compare(hi.hist.Bounds.GetRow(hi.curBucketPos*2+1), 0, v)
if cmp < 0 {
// This value is bigger than current bucket's upper bound, goto next bucket.
hi.hist.Buckets[hi.curBucketPos].Count -= hi.totalSubstracted
if hi.hist.Buckets[hi.curBucketPos].Count < hi.totalSubstracted {
hi.hist.Buckets[hi.curBucketPos].Count = 0
} else {
hi.hist.Buckets[hi.curBucketPos].Count -= hi.totalSubstracted
}
hi.curBucketPos++
continue
}
Expand All @@ -163,7 +168,8 @@ func (hi *histIter) remove(v *types.Datum) int64 {
hi.curBucketPos++
return ret
}
ret := int64(hi.hist.NotNullCount() / float64(hi.hist.NDV))
// The value falls in the current bucket.
ret := int64(math.Max(hi.hist.NotNullCount()-float64(hi.totalSubstracted), 0) / float64(hi.hist.NDV))
hi.totalSubstracted += ret
return ret
}
Expand All @@ -172,6 +178,11 @@ func (hi *histIter) remove(v *types.Datum) int64 {
// finish cleans the unfinished subtraction.
func (hi *histIter) finish() {
for i := hi.curBucketPos; i < len(hi.hist.Buckets); i++ {
// Avoid the negative.
if hi.hist.Buckets[i].Count < hi.totalSubstracted {
hi.hist.Buckets[i].Count = 0
continue
}
hi.hist.Buckets[i].Count -= hi.totalSubstracted
}
}
Expand Down Expand Up @@ -215,7 +226,7 @@ func (h topNMeataHeap) Len() int {
}

func (h topNMeataHeap) Less(i, j int) bool {
return bytes.Compare(h[i].Encoded, h[j].Encoded) < 0
return h[i].Count < h[j].Count
}

func (h topNMeataHeap) Swap(i, j int) {
Expand Down Expand Up @@ -288,18 +299,76 @@ func MergePartTopN2GlobalTopN(
type maintaining struct {
affectedTopNs *bitset.BitSet
item statistics.TopNMeta
cleared bool
}
cur := maintaining{
affectedTopNs: bitset.New(uint(len(hists))),
cleared: true,
}
step := int64(0)
histRemoveCnt := int64(0)
var finalTopNs topNMeataHeap = make([]statistics.TopNMeta, 0, n+1)
remainedTopNs := make([]statistics.TopNMeta, 0, n)
skipCount := 0
affectedHist := make([]int, 0, len(hists))
firstTime := true
checkTheCurAndMoveForward := func(nextVal *statistics.TopNMeta, position uint) error {
winoros marked this conversation as resolved.
Show resolved Hide resolved
// It's perf-sensitive path. Don't use defer.
// Initializing the datum.
d, err := statistics.TopNMetaValToDatum(cur.item.Encoded, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return err
}
affectedHist = affectedHist[:0]
// The following codes might access the NextClear loop twice. Record it here for saving CPU.
for histPos, found := cur.affectedTopNs.NextClear(0); found; histPos, found = cur.affectedTopNs.NextClear(histPos + 1) {
affectedHist = append(affectedHist, int(histPos))
}
// Hacking skip.
if uint32(len(finalTopNs)) >= n {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure which one we prefer. len(finalTopNs) vs. finalTopNs.Len(). But I guess it doesn't matter.

maxPossible := int64(0)
for _, histPos := range affectedHist {
maxPossible += maxPossibleAdded[histPos]
}
// The maximum possible added value still cannot make it replace the smallest topn.
if maxPossible+int64(cur.item.Count) < int64(finalTopNs[0].Count) {
skipCount++
remainedTopNs = append(remainedTopNs, statistics.TopNMeta{Encoded: cur.item.Encoded, Count: cur.item.Count})
// Set the cur maintained to the next value.
cur.item.Encoded = nextVal.Encoded
cur.item.Count = nextVal.Count
cur.affectedTopNs.ClearAll()
cur.affectedTopNs.Set(position)
return nil
}
}
for _, histPos := range affectedHist {
histRemoveCnt++
// Remove the value from the hist and add it into the current maintained value.
cur.item.Count += uint64(histIters[histPos].remove(&d))
}
// Size reaches the n, maintaining the heap.
if finalTopNs.Len() == int(n) {
if finalTopNs[0].Count < cur.item.Count {
remainedTopNs = append(remainedTopNs, finalTopNs[0])
finalTopNs[0].Encoded = cur.item.Encoded
finalTopNs[0].Count = cur.item.Count
heap.Fix(&finalTopNs, 0)
} else {
remainedTopNs = append(remainedTopNs, cur.item)
}
} else {
// Otherwise the heap is not fulfilled.
finalTopNs = append(finalTopNs, statistics.TopNMeta{Encoded: cur.item.Encoded, Count: cur.item.Count})
if finalTopNs.Len() == int(n) {
heap.Init(&finalTopNs)
}
}
// Set the cur maintained to the next value.
cur.item.Encoded = nextVal.Encoded
cur.item.Count = nextVal.Count
cur.affectedTopNs.ClearAll()
cur.affectedTopNs.Set(position)
return nil
}
for {
if err := killer.HandleSignal(); err != nil {
return nil, nil, nil, err
Expand All @@ -317,85 +386,31 @@ func MergePartTopN2GlobalTopN(
} else {
heap.Pop(&mergingHeap)
}
// The maintained one is cleared before. Set it and goto next round.
if cur.cleared {
// Init the cur when we first enter the heap.
if firstTime {
cur.item.Encoded = headTopN.Encoded
cur.item.Count = headTopN.Count
cur.affectedTopNs.Set(uint(head.idx))
cur.cleared = false
firstTime = false
continue
}
cmp := bytes.Compare(cur.item.Encoded, headTopN.Encoded)
cnt := headTopN.Count
metaHeapInit := false
// The heap's head move forward.
if cmp < 0 {
// Initializing the datum.
d, err := statistics.TopNMetaValToDatum(cur.item.Encoded, hists[0].Tp.GetType(), isIndex, loc)
err := checkTheCurAndMoveForward(headTopN, uint(head.idx))
if err != nil {
return nil, nil, hists, err
}
affectedHist = affectedHist[:0]
// The following codes might access the NextClear loop twice. Record it here for saving CPU.
for histPos, found := cur.affectedTopNs.NextClear(0); found; histPos, found = cur.affectedTopNs.NextClear(histPos + 1) {
affectedHist = append(affectedHist, int(histPos))
}
// Hacking skip.
if uint32(len(finalTopNs)) >= n {
maxPossible := int64(0)
for _, histPos := range affectedHist {
maxPossible += maxPossibleAdded[histPos]
}
// The maximum possible added value still cannot make it replace the smallest topn.
if maxPossible+int64(cur.item.Count) < int64(finalTopNs[0].Count) {
skipCount++
remainedTopNs = append(remainedTopNs, statistics.TopNMeta{Encoded: cur.item.Encoded, Count: cur.item.Count})
cur.cleared = true
cur.affectedTopNs.ClearAll()
continue
}
}
for _, histPos := range affectedHist {
histRemoveCnt++
// Remove the value from the hist and add it into the current maintained value.
cur.item.Count += uint64(histIters[histPos].remove(&d))
}
cur.item.Count += cnt
if metaHeapInit {
if finalTopNs[0].Count < cur.item.Count {
finalTopNs[0].Encoded = cur.item.Encoded
finalTopNs[0].Count = cur.item.Count
heap.Fix(&finalTopNs, 0)
}
} else if finalTopNs.Len() < int(n) {
finalTopNs = append(finalTopNs, statistics.TopNMeta{Encoded: cur.item.Encoded, Count: cur.item.Count})
} else {
heap.Init(&finalTopNs)
metaHeapInit = true
return nil, nil, nil, err
}
cur.cleared = true
cur.affectedTopNs.ClearAll()
continue
}
// The cmp result cannot be 1 because the value is strictly increasing.
// Here is cmp == 0.
cur.item.Count += cnt
cur.item.Count += headTopN.Count
cur.affectedTopNs.Set(uint(head.idx))
}
if !cur.cleared {
// There's uncleared item. Clear it.
// Initializing the datum.
d, err := statistics.TopNMetaValToDatum(cur.item.Encoded, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return nil, nil, hists, err
}
for histPos, found := cur.affectedTopNs.NextClear(0); found; histPos, found = cur.affectedTopNs.NextClear(histPos + 1) {
// Remove the value from the hist and add it into the current maintained value.
cur.item.Count += uint64(histIters[histPos].remove(&d))
}
// This is the last inerstion so we don't need to maintain the heap anymore.
// And the slice will be sorted later, so we can use that sort to deal with the case that its length exceeds the `n`.
finalTopNs = append(finalTopNs, cur.item)
{
winoros marked this conversation as resolved.
Show resolved Hide resolved
// Next val and the position is useless
checkTheCurAndMoveForward(&cur.item, 0)
}
for _, iter := range histIters {
iter.finish()
Expand Down
10 changes: 9 additions & 1 deletion pkg/statistics/handle/globalstats/topn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@ func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*statistics.Histogram, 0, 10)
for i := 0; i < 10; i++ {
// Construct Hist
h := statistics.NewHistogram(1, 0, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
hists = append(hists, h)
}

// Test merge 2 topN with nil hists.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, topNs, 2, nil, false, &killer)
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, topNs, 2, hists, false, &killer)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
Expand Down