Skip to content

Commit

Permalink
Merge pull request grafana#123 from pyroscope-io/segment-bug-fixes
Browse files Browse the repository at this point in the history
Segment Tree bug fixes
  • Loading branch information
petethepig authored Mar 12, 2021
2 parents edabcca + 466cd8b commit fae6ed5
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 38 deletions.
9 changes: 7 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,17 @@ func NewForTests(path string) *Config {
StoragePath: path,
APIBindAddr: ":4040",

CacheSegmentSize: 10,
CacheTreeSize: 10,
CacheSegmentSize: 10,
CacheTreeSize: 10,
CacheDictionarySize: 10,
CacheDimensionSize: 10,

Multiplier: 10,
MinResolution: 10 * time.Second,
MaxResolution: time.Hour * 24 * 365 * 5,

MaxNodesSerialization: 2048,
MaxNodesRender: 2048,
},
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/dbmanager/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
)

func Cli(cfg *config.Config, args []string) error {
// spew.Dump(cfg.DbManager)
// spew.Dump(args)

if len(args) == 0 {
return fmt.Errorf("please provide a command")
}
Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/segment/fuzz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package segment

import (
"log"
"math/big"
"math/rand"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)

type datapoint struct {
t time.Time
samples uint64
r *big.Rat
}

type storageMock struct {
resolution time.Duration
data []datapoint
}

func newMock(resolution time.Duration) *storageMock {
return &storageMock{
resolution: resolution,
data: []datapoint{},
}
}

func (sm *storageMock) Put(st, et time.Time, samples uint64) {
st, et = normalize(st, et)
fullDur := et.Sub(st) / sm.resolution
for t := st; t.Before(et); t = t.Add(sm.resolution) {
d := datapoint{
t: t,
samples: samples,
r: big.NewRat(int64(samples), int64(fullDur)),
}

sm.data = append(sm.data, d)
}
}

func (sm *storageMock) Get(st, et time.Time, cb func(depth int, samples uint64, t time.Time, r *big.Rat)) {
st, et = normalize(st, et)
for _, d := range sm.data {
if !d.t.Before(st) && !d.t.Add(sm.resolution).After(et) {
cb(0, 1, d.t, d.r)
}
}
}

func fuzzTest(writeSize func() int) {
s := New(10*time.Second, 10)
m := newMock(10 * time.Second)

r := rand.New(rand.NewSource(123))

for k := 0; k < 20; k++ {
maxStartTime := r.Intn(5000)
for i := 0; i < r.Intn(200); i++ {
sti := r.Intn(maxStartTime) * 10
st := testing.SimpleTime(sti)
et := testing.SimpleTime(sti + writeSize())
dur := et.Sub(st)
// this is not exactly a fair game, but we have to adjust for
samples := uint64(r.Intn(10)) * uint64(dur/(10*time.Second))

m.Put(st, et, samples)
s.Put(st, et, samples, func(depth int, t time.Time, r *big.Rat, addons []Addon) {
log.Println(depth, r, dur)
})
}
mSum := big.NewRat(0, 1)
sSum := big.NewRat(0, 1)
for i := 0; i < r.Intn(100); i++ {
sti := r.Intn(100) * 10
st := testing.SimpleTime(sti)
et := testing.SimpleTime(sti + r.Intn(100)*10)

m.Get(st, et, func(depth int, samples uint64, t time.Time, r *big.Rat) {
mSum.Add(mSum, r.Mul(r, big.NewRat(int64(samples), 1)))
})

s.Get(st, et, func(depth int, samples uint64, t time.Time, r *big.Rat) {
sSum.Add(sSum, r.Mul(r, big.NewRat(int64(samples), 1)))
})
}
mSumF, _ := mSum.Float64()
log.Println("m:", mSum, mSumF)

sSumF, _ := sSum.Float64()
log.Println("s:", sSum, sSumF)

Expect(mSum.Cmp(sSum)).To(Equal(0))
}
// Expect(1).To(Equal(0))
}

// See https://github.com/pyroscope-io/pyroscope/issues/28 for more context
var _ = Describe("segment", func() {
Context("fuzz tests", func() {
Context("writes are 10 second long", func() {
It("works as expected", func(done Done) {
fuzzTest(func() int {
return 10
})
close(done)
}, 5)
})
Context("writes are different lengths", func() {
It("works as expected", func(done Done) {
fuzzTest(func() int {
return 20
// return 1 + rand.Intn(10)*10
})
close(done)
}, 5)
})
})
})
26 changes: 18 additions & 8 deletions pkg/storage/segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (sn *streeNode) relationship(st, et time.Time) rel {
return relationship(sn.time, t2, st, et)
}

func (sn *streeNode) endTime() time.Time {
return sn.time.Add(durations[sn.depth])
}

func (sn *streeNode) overlapRead(st, et time.Time) *big.Rat {
t2 := sn.time.Add(durations[sn.depth])
return overlapRead(sn.time, t2, st, et, durations[0])
Expand Down Expand Up @@ -66,7 +70,7 @@ func (sn *streeNode) findAddons() []Addon {
return res
}

func (sn *streeNode) put(st, et time.Time, _ uint64, cb func(n *streeNode, depth int, dt time.Time, r *big.Rat, addons []Addon)) {
func (sn *streeNode) put(st, et time.Time, samples uint64, cb func(n *streeNode, depth int, dt time.Time, r *big.Rat, addons []Addon)) {
nodes := []*streeNode{sn}

for len(nodes) > 0 {
Expand Down Expand Up @@ -94,18 +98,22 @@ func (sn *streeNode) put(st, et time.Time, _ uint64, cb func(n *streeNode, depth
}
var addons []Addon

r := sn.overlapWrite(st, et)
fv, _ := r.Float64()
sn.samples += uint64(float64(samples) * fv)

// relationship overlap read overlap write
// inside rel = iota // | S E | <1 1/1
// match // matching ranges 1/1 1/1
// outside // | | S E 0/1 0/1
// overlap // | S | E <1 <1
// contain // S | | E 1/1 <1

if rel == match || rel == contain || childrenCount > 1 || sn.present {
if !sn.present {
addons = sn.findAddons()
}
// TODO: calculate proper r
r := sn.overlapWrite(st, et)

cb(sn, sn.depth, sn.time, r, addons)
sn.present = true
}
Expand All @@ -119,6 +127,9 @@ func normalize(st, et time.Time) (time.Time, time.Time) {
if et2.Equal(et) {
return st, et
}
// if st.Equal(et2) {
// et2.Add(durations[0])
// }
return st, et2.Add(durations[0])
}

Expand Down Expand Up @@ -213,7 +224,7 @@ func (s *Segment) growTree(st, et time.Time) {
var prevVal *streeNode
if s.root != nil {
st = minTime(st, s.root.time)
et = maxTime(et, s.root.time)
et = maxTime(et, s.root.endTime())
} else {
st = st.Truncate(s.durations[0])
s.root = newNode(st, 0, s.multiplier)
Expand All @@ -228,7 +239,7 @@ func (s *Segment) growTree(st, et time.Time) {

prevVal = s.root
newDepth := prevVal.depth + 1
s.root = newNode(st.Truncate(s.durations[newDepth]), newDepth, s.multiplier)
s.root = newNode(prevVal.time.Truncate(s.durations[newDepth]), newDepth, s.multiplier)
if prevVal != nil {
s.root.samples = prevVal.samples
s.root.replace(prevVal)
Expand All @@ -250,14 +261,13 @@ func (s *Segment) Put(st, et time.Time, samples uint64, cb func(depth int, t tim
s.growTree(st, et)
v := newVis()
s.root.put(st, et, samples, func(sn *streeNode, depth int, tm time.Time, r *big.Rat, addons []Addon) {
sn.samples += samples * uint64(r.Num().Int64()) / uint64(r.Denom().Int64())
v.add(sn, r, true)
cb(depth, tm, r, addons)
})
v.print(fmt.Sprintf("/tmp/0-put-%s-%s.html", st.String(), et.String()))
}

func (s *Segment) Get(st, et time.Time, cb func(depth int, t time.Time, r *big.Rat)) {
func (s *Segment) Get(st, et time.Time, cb func(depth int, samples uint64, t time.Time, r *big.Rat)) {
s.m.RLock()
defer s.m.RUnlock()

Expand All @@ -270,7 +280,7 @@ func (s *Segment) Get(st, et time.Time, cb func(depth int, t time.Time, r *big.R
s.root.get(st, et, func(sn *streeNode, depth int, t time.Time, r *big.Rat) {
// TODO: pass m / d from .get() ?
v.add(sn, r, true)
cb(depth, t, r)
cb(depth, sn.samples, t, r)
})
v.print(fmt.Sprintf("/tmp/0-get-%s-%s.html", st.String(), et.String()))
}
Expand Down
Loading

0 comments on commit fae6ed5

Please sign in to comment.