From 31d71abe8c206c24e03ace324eea6c87d3d5dada Mon Sep 17 00:00:00 2001 From: Sergey Grebenshchikov Date: Thu, 29 Aug 2024 20:08:45 +0200 Subject: [PATCH] init --- .github/workflows/go.yml | 28 ++++ README.md | 135 +++++++++++++++++ go.mod | 9 ++ go.sum | 6 + hash.go | 14 ++ heap/heap.go | 150 +++++++++++++++++++ heap/sizeof.go | 10 ++ internal/sizeof/sizeof.go | 11 ++ options.go | 13 ++ renovate.json | 6 + sizeof.go | 10 ++ sketch.go | 220 ++++++++++++++++++++++++++++ sketch_test.go | 33 +++++ sliding/bucket.go | 51 +++++++ sliding/options.go | 17 +++ sliding/sizeof.go | 10 ++ sliding/sketch.go | 299 ++++++++++++++++++++++++++++++++++++++ sliding/sketch_test.go | 141 ++++++++++++++++++ 18 files changed, 1163 insertions(+) create mode 100644 .github/workflows/go.yml create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hash.go create mode 100644 heap/heap.go create mode 100644 heap/sizeof.go create mode 100644 internal/sizeof/sizeof.go create mode 100644 options.go create mode 100644 renovate.json create mode 100644 sizeof.go create mode 100644 sketch.go create mode 100644 sketch_test.go create mode 100644 sliding/bucket.go create mode 100644 sliding/options.go create mode 100644 sliding/sizeof.go create mode 100644 sliding/sketch.go create mode 100644 sliding/sketch_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..6092f4a --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,28 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.23' + + - name: Build + run: go build -v ./... + + - name: Test + run: go test -v ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..2366798 --- /dev/null +++ b/README.md @@ -0,0 +1,135 @@ +# topk + +Sliding-window and regular top-K sketches. + +- A fast implementation of the HeavyKeeper top-K sketch inspired by the [segmentio implementation](github.com/segmentio/topk) and [RedisBloom implementation](https://github.com/RedisBloom/RedisBloom/blob/b5916e1b9fba17829c3e329c127b99d706eb31f6/src/topk.c). +- A sliding-window top-K sketch, also based on HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf) + +```go +import ( + "github.com/keilerkonzept/topk" // plain sketch + "github.com/keilerkonzept/topk/sliding" // sliding-window sketch +) +``` + +## Contents + +- [Examples](#examples) + - [Top-K Sketch](#top-k-sketch) + - [Sliding-window Top-K Sketch](#sliding-window-top-k-sketch) +- [Benchmarks](#benchmarks) + - [Top-K Sketch](#top-k-sketch) + - [Sliding-Window Top-K Sketch](#sliding-window-top-k-sketch) + - [Decay LUT impact](#decay-lut-impact) + +## Examples + +### Top-K Sketch + +```go +package main + +import ( + "log" + "github.com/keilerkonzept/topk" +) + +func main() { + // make a new sketch keeping track of k=3 items using 1024x3 = 3072 buckets. + sketch := topk.New(3, topk.WithWidth(1024), topk.WithDepth(3)) + + log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory") + + sketch.Incr("an item") // count "an item" 1 time + sketch.Add("an item", 123) // count "an item" 123 times + sketch.Add("another item", 4) // count "another item" 4 times + sketch.Add("an item", 5) // count "an item" 5 more times + sketch.Add("yet another item", 6) // count "yet another item" 6 times + + if sketch.Query("an item") { + // "an item" is in the top K items observed within the last 60 ticks + } + + _ = sketch.Count("another item") // return the estimated count for "another item" + + // SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs. + for _, entry := range sketch.SortedSlice() { + log.Println(entry.Item, "has been counted", entry.Count, "times") + } + + // Iter is an interator over the (*not* sorted) current top-K entries. + for entry := range sketch.Iter { + log.Println(entry.Item, "has been counted", entry.Count, "times") + } + sketch.Reset() // reset to New() state +} +``` + + +### Sliding-window Top-K Sketch + +```go +package main + +import ( + "log" + "github.com/keilerkonzept/topk/sliding" +) + +func main() { + // make a new sketch keeping track of k=3 items over a window of the last 60 ticks + // use width=1024 x depth=3 = 3072 buckets + sketch := sliding.New(3, 60, sliding.WithWidth(1024), sliding.WithDepth(3)) + + log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory") + + sketch.Incr("an item") // count "an item" 1 time + sketch.Add("an item", 123) // count "an item" 123 times + sketch.Tick() // advance time by one tick + sketch.Add("another item", 4) // count "another item" 4 times + sketch.Ticks(2) // advance time by two ticks + sketch.Add("an item", 5) // count "an item" 5 more times + sketch.Add("yet another item", 6) // count "yet another item" 6 times + + if sketch.Query("an item") { + // "an item" is in the top K items observed within the last 60 ticks + } + + _ = sketch.Count("another item") // return the estimated count for "another item" + + // SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs. + for _, entry := range sketch.SortedSlice() { + log.Println(entry.Item, "has been counted", entry.Count, "times") + } + + // Iter is an interator over the (*not* sorted) current top-K entries. + for entry := range sketch.Iter { + log.Println(entry.Item, "has been counted", entry.Count, "times") + } + sketch.Reset() // reset to New() state +} +``` + +## Benchmarks + +### Top-K Sketch + +(TBD) + +### Sliding-Window Top-K Sketch + +(TBD) + +### Decay LUT impact + +The size of the look-up table is configured using the `WithDecayLUTSize` option. If the look-up table covers the actual counts involved, the speedup can be significant: + +``` +goos: darwin +goarch: arm64 +pkg: github.com/keilerkonzept/topk +cpu: Apple M1 Pro + +BenchmarkSketch_1000_3k_3-10 124646 8450 ns/op +BenchmarkSegmentioTopK_1000_3k_3-10 39903 32345 ns/op +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..614829f --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/keilerkonzept/topk + +go 1.23.0 + +require ( + github.com/OneOfOne/xxhash v1.2.8 + github.com/google/go-cmp v0.6.0 + github.com/segmentio/topk v0.1.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..91e3880 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= +github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/segmentio/topk v0.1.1 h1:cBhsKta9OOtqELxTmbeopRUcUS8w/JamRtFtKZsY/k8= +github.com/segmentio/topk v0.1.1/go.mod h1:ngYjeabuYvDMENm7drxGmf8EmD1H9CIckKEIlWNB+MI= diff --git a/hash.go b/hash.go new file mode 100644 index 0000000..2ffe87f --- /dev/null +++ b/hash.go @@ -0,0 +1,14 @@ +package topk + +import "github.com/OneOfOne/xxhash" + +const hashSeed = 4848280 + +func Fingerprint(item string) uint32 { + return xxhash.ChecksumString32S(item, hashSeed) +} + +func BucketIndex(item string, row, width int) int { + column := int(xxhash.ChecksumString32S(item, uint32(row))) % width + return row*width + column +} diff --git a/heap/heap.go b/heap/heap.go new file mode 100644 index 0000000..eeb1fbb --- /dev/null +++ b/heap/heap.go @@ -0,0 +1,150 @@ +package heap + +import ( + "container/heap" + + "github.com/keilerkonzept/topk/internal/sizeof" +) + +type Item struct { + Fingerprint uint32 + Item string + Count uint32 +} + +type Min struct { + K int + Items []Item + Index map[string]int + StoredKeysBytes int +} + +func NewMin(k int) *Min { + return &Min{ + K: k, + Items: make([]Item, k), + Index: make(map[string]int, k), + } +} + +var _ heap.Interface = &Min{} + +func (me Min) SizeBytes() int { + structSize := sizeofMinStruct + bucketsSize := len(me.Items)*sizeofItem + me.StoredKeysBytes + indexSize := sizeof.StringIntMap + (sizeof.Int+sizeof.String)*len(me.Index) + return structSize + bucketsSize + indexSize +} + +func (me *Min) Reinit() { + heap.Init(me) + for me.Len() > 0 && me.Items[0].Count == 0 { + item := me.Items[0].Item + heap.Pop(me) + delete(me.Index, item) + } +} + +func (me Min) Full() bool { return len(me.Items) == me.K } + +// Len is container/heap.Interface.Len(). +func (me Min) Len() int { return len(me.Items) } + +// Less is container/heap.Interface.Less(). +func (me Min) Less(i, j int) bool { + ic := me.Items[i].Count + jc := me.Items[j].Count + if ic == jc { + return me.Items[i].Item < me.Items[j].Item + } + return ic < jc +} + +// Swap is container/heap.Interface.Swap(). +func (me Min) Swap(i, j int) { + itemi := me.Items[i].Item + itemj := me.Items[j].Item + me.Items[i], me.Items[j] = me.Items[j], me.Items[i] + me.Index[itemi] = j + me.Index[itemj] = i +} + +// Push is container/heap.Interface.Push(). +func (me *Min) Push(x interface{}) { + b := x.(Item) + me.Items = append(me.Items, b) + me.Index[b.Item] = len(me.Items) - 1 +} + +// Pop is container/heap.Interface.Pop(). +func (me *Min) Pop() interface{} { + old := me.Items + n := len(old) + x := old[n-1] + me.Items = old[0 : n-1] + delete(me.Index, x.Item) + return x +} + +// Min returns the minimum count in the heap or 0 if the heap is empty. +func (me Min) Min() uint32 { + if len(me.Items) == 0 { + return 0 + } + return me.Items[0].Count +} + +func (me Min) Find(item string) (i int) { + if i, ok := me.Index[item]; ok { + return i + } + return -1 +} + +func (me Min) Contains(item string) bool { + _, ok := me.Index[item] + return ok +} + +func (me Min) Get(item string) *Item { + if i, ok := me.Index[item]; ok { + return &me.Items[i] + } + return nil +} + +func (me *Min) Update(item string, fingerprint uint32, count uint32) bool { + if count < me.Min() && me.Full() { // not in top k: ignore + return false + } + + if i := me.Find(item); i >= 0 { // already in heap: update count + me.Items[i].Count = count + heap.Fix(me, i) + return true + } + + me.StoredKeysBytes += len(item) + + if !me.Full() { // heap not full: add to heap + me.Push(Item{ + Count: count, + Fingerprint: fingerprint, + Item: item, + }) + return true + } + + // replace min on heap + minItem := me.Items[0].Item + me.StoredKeysBytes -= len(minItem) + delete(me.Index, minItem) + me.Items[0] = Item{ + Count: count, + Fingerprint: fingerprint, + Item: item, + } + me.Index[item] = 0 + heap.Fix(me, 0) + return true +} diff --git a/heap/sizeof.go b/heap/sizeof.go new file mode 100644 index 0000000..4048208 --- /dev/null +++ b/heap/sizeof.go @@ -0,0 +1,10 @@ +package heap + +import ( + "unsafe" +) + +const ( + sizeofMinStruct = int(unsafe.Sizeof(Min{})) + sizeofItem = int(unsafe.Sizeof(Item{})) +) diff --git a/internal/sizeof/sizeof.go b/internal/sizeof/sizeof.go new file mode 100644 index 0000000..242efcf --- /dev/null +++ b/internal/sizeof/sizeof.go @@ -0,0 +1,11 @@ +package sizeof + +import "unsafe" + +const ( + StringIntMap = int(unsafe.Sizeof(map[string]int{})) + String = int(unsafe.Sizeof("")) + Int = int(unsafe.Sizeof(int(0))) + UInt32 = int(unsafe.Sizeof(uint32(0))) + Float32 = int(unsafe.Sizeof(float32(0))) +) diff --git a/options.go b/options.go new file mode 100644 index 0000000..319d7fc --- /dev/null +++ b/options.go @@ -0,0 +1,13 @@ +package topk + +type Option func(*Sketch) + +func WithDepth(depth int) Option { return func(s *Sketch) { s.Depth = depth } } + +func WithWidth(width int) Option { return func(s *Sketch) { s.Width = width } } + +func WithDecay(decay float32) Option { return func(s *Sketch) { s.Decay = decay } } + +func WithDecayLUTSize(n int) Option { + return func(s *Sketch) { s.DecayLUT = make([]float32, n) } +} diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..5db72dd --- /dev/null +++ b/renovate.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:recommended" + ] +} diff --git a/sizeof.go b/sizeof.go new file mode 100644 index 0000000..f5587ba --- /dev/null +++ b/sizeof.go @@ -0,0 +1,10 @@ +package topk + +import ( + "unsafe" +) + +const ( + sizeofSketchStruct = int(unsafe.Sizeof(Sketch{})) + sizeofBucketStruct = int(unsafe.Sizeof(Bucket{})) +) diff --git a/sketch.go b/sketch.go new file mode 100644 index 0000000..87fe6e2 --- /dev/null +++ b/sketch.go @@ -0,0 +1,220 @@ +package topk + +import ( + "math" + "math/rand/v2" + "slices" + "sort" + + "github.com/keilerkonzept/topk/heap" + "github.com/keilerkonzept/topk/internal/sizeof" +) + +type Bucket struct { + Fingerprint uint32 + Count uint32 +} + +type Sketch struct { + K int // Keep track of top `K` items in the min-heap.. + Width int // Number of buckets per hash function. + Depth int // Number of hash functions. + + // `math.Pow(Decay, i)` is the probability that a flow's counter with value `i` is decremented on collision. + Decay float32 + // Look-up table for powers of `Decay`. The value at `i` is `math.Pow(Decay, i)` + DecayLUT []float32 + + Buckets []Bucket // Sketch counters. + Heap *heap.Min // Top-K min-heap. +} + +func New(k int, opts ...Option) *Sketch { + log_k := int(math.Ceil(math.Log(float64(k)))) + + // default settings + out := Sketch{ + K: k, + Width: intMax(256, k*log_k), + Depth: intMax(3, log_k), + Decay: 0.9, + } + + for _, o := range opts { + o(&out) + } + + if len(out.DecayLUT) == 0 { + // if not specified, default to 256 + out.DecayLUT = make([]float32, 256) + } + + out.Heap = heap.NewMin(out.K) + out.initBuckets() + out.initDecayLUT() + + return &out +} + +func (me *Sketch) initDecayLUT() { + for i := range me.DecayLUT { + me.DecayLUT[i] = float32(math.Pow(float64(me.Decay), float64(i))) + } +} + +func (me *Sketch) initBuckets() { + me.Buckets = make([]Bucket, me.Width*me.Depth) +} + +// SizeBytes returns the current size of the sketch in bytes. +func (me *Sketch) SizeBytes() int { + bucketsSize := (sizeofBucketStruct) * len(me.Buckets) + heapSize := me.Heap.SizeBytes() + decayTableSize := len(me.DecayLUT) * sizeof.Float32 + return sizeofSketchStruct + + bucketsSize + + heapSize + + decayTableSize +} + +// Count returns the estimated count of the given item. +func (me *Sketch) Count(item string) uint32 { + if i := me.Heap.Find(item); i >= 0 { + b := me.Heap.Items[i] + if b.Item == item { + return b.Count + } + } + + fingerprint := Fingerprint(item) + var max uint32 + + for i := range me.Depth { + b := &me.Buckets[BucketIndex(item, i, me.Width)] + if b.Fingerprint != fingerprint { + continue + } + max = uint32Max(max, b.Count) + } + + return max +} + +// Incr counts a single instance of the given item. +func (me *Sketch) Incr(item string) bool { + return me.Add(item, 1) +} + +// Add increments the given item's count by the given increment. +// Returns whether the item is in the top K. +func (me *Sketch) Add(item string, increment uint32) bool { + var max uint32 + fingerprint := Fingerprint(item) + + width := me.Width + for i := range me.Depth { + k := BucketIndex(item, i, width) + b := &me.Buckets[k] + count := b.Count + switch { + // empty bucket (zero count) + case count == 0: + b.Fingerprint = fingerprint + b.Count = increment + count = increment + + // this flow's bucket (equal fingerprint) + case b.Fingerprint == fingerprint: + b.Count = increment + count += increment + + // another flow's bucket (nonequal fingerprint) + default: + // can't be inlined, so not factored out + var decay float32 + lookupTableSize := uint32(len(me.DecayLUT)) + for incrementRemaining := increment; incrementRemaining > 0; incrementRemaining-- { + if count < lookupTableSize { + decay = me.DecayLUT[count] + } else { + decay = float32(math.Pow( + float64(me.DecayLUT[lookupTableSize-1]), + float64(count/(lookupTableSize-1)))) * me.DecayLUT[count%(lookupTableSize-1)] + } + if rand.Float32() < decay { + count-- + if count == 0 { + b.Fingerprint = fingerprint + count = incrementRemaining + break + } + } + } + } + + b.Count = count + max = uint32Max(max, count) + } + + return me.Heap.Update(item, fingerprint, max) +} + +// Query returns whether the given item is in the top K items by count. +func (me *Sketch) Query(item string) bool { + return me.Heap.Contains(item) +} + +// Iter iterates over the top K items. +func (me *Sketch) Iter(yield func(*heap.Item) bool) { + for i := range me.Heap.Items { + if me.Heap.Items[i].Count == 0 { + continue + } + if !yield(&me.Heap.Items[i]) { + break + } + } +} + +// SortedSlice returns the top K items as a sorted slice. +func (me *Sketch) SortedSlice() []heap.Item { + out := slices.Clone(me.Heap.Items) + + sort.SliceStable(out, func(i, j int) bool { + ci, cj := out[i].Count, out[j].Count + if ci == cj { + return out[i].Item < out[j].Item + } + return ci > cj + }) + + end := len(out) + for ; end > 0; end-- { + if out[end-1].Count > 0 { + break + } + } + + return out[:end] +} + +// Reset resets the sketch to an empty state. +func (me *Sketch) Reset() { + clear(me.Buckets) + clear(me.Heap.Items) + clear(me.Heap.Index) +} + +func uint32Max(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func intMax(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/sketch_test.go b/sketch_test.go new file mode 100644 index 0000000..f52cd54 --- /dev/null +++ b/sketch_test.go @@ -0,0 +1,33 @@ +package topk_test + +import ( + "fmt" + "testing" + + "github.com/keilerkonzept/topk" + segmentio_topk "github.com/segmentio/topk" +) + +func BenchmarkSketch_1000_3k_3(b *testing.B) { + sketch := topk.New(1000, topk.WithWidth(3_000), topk.WithDepth(3), topk.WithDecayLUTSize(1024)) + items := make([]string, 2048) + for i := range items { + items[i] = fmt.Sprint(i) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + sketch.Add(items[i%len(items)], uint32(i%len(items))) + } +} + +func BenchmarkSegmentioTopK_1000_3k_3(b *testing.B) { + sketch := segmentio_topk.New(1000, 0.9) + items := make([]string, 2048) + for i := range items { + items[i] = fmt.Sprint(i) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + sketch.Sample(items[i%len(items)], uint32(i%len(items))) + } +} diff --git a/sliding/bucket.go b/sliding/bucket.go new file mode 100644 index 0000000..3cd3276 --- /dev/null +++ b/sliding/bucket.go @@ -0,0 +1,51 @@ +package sliding + +type Bucket struct { + Fingerprint uint32 + + // Counts is a circular buffer (with its first entry at .First) + Counts []uint32 + First uint32 + // CountsSum is the current sum of Counts + CountsSum uint32 +} + +func (me *Bucket) tick() { + if me.CountsSum == 0 { + return + } + + last := me.First + if last == 0 { + last = uint32(len(me.Counts) - 1) + } else { + last = uint32(last - 1) + } + me.CountsSum -= me.Counts[last] + me.Counts[last] = 0 + me.First = last +} + +func (me *Bucket) findNonzeroMinimumCount() int { + countsMinIdx := uint32(0) + first := true + var countsMin uint32 + i := me.First + for range len(me.Counts) { + if i == uint32(len(me.Counts)) { + i = 0 + } + c := me.Counts[i] + if c == 0 { + i++ + continue + } + if first || c < countsMin { + countsMin = c + countsMinIdx = i + first = false + } + i++ + } + return int(countsMinIdx) +} diff --git a/sliding/options.go b/sliding/options.go new file mode 100644 index 0000000..4b9188a --- /dev/null +++ b/sliding/options.go @@ -0,0 +1,17 @@ +package sliding + +type Option func(*Sketch) + +func WithDepth(depth int) Option { return func(s *Sketch) { s.Depth = depth } } + +func WithWidth(width int) Option { return func(s *Sketch) { s.Width = width } } + +func WithDecay(decay float32) Option { return func(s *Sketch) { s.Decay = decay } } + +func WithDecayLUTSize(n int) Option { + return func(s *Sketch) { s.DecayLUT = make([]float32, n) } +} + +func WithBucketHistoryLength(n int) Option { + return func(s *Sketch) { s.BucketHistoryLength = n } +} diff --git a/sliding/sizeof.go b/sliding/sizeof.go new file mode 100644 index 0000000..7a27770 --- /dev/null +++ b/sliding/sizeof.go @@ -0,0 +1,10 @@ +package sliding + +import ( + "unsafe" +) + +const ( + sizeofSketchStruct = int(unsafe.Sizeof(Sketch{})) + sizeofBucketStruct = int(unsafe.Sizeof(Bucket{})) +) diff --git a/sliding/sketch.go b/sliding/sketch.go new file mode 100644 index 0000000..fc02957 --- /dev/null +++ b/sliding/sketch.go @@ -0,0 +1,299 @@ +// Package sliding implements a sliding-window HeavyKeeper, as described in "A Sketch Framework for Approximate Data Stream Processing in Sliding Windows" [1] +// +// [1] https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf +package sliding + +import ( + "math" + "math/rand/v2" + "slices" + "sort" + + "github.com/keilerkonzept/topk" + "github.com/keilerkonzept/topk/heap" + "github.com/keilerkonzept/topk/internal/sizeof" +) + +type Sketch struct { + K int // Keep track of top `K` items in the min-heap.. + Width int // Number of buckets per hash function. + Depth int // Number of hash functions. + WindowSize int // N: window size in ticks. + BucketHistoryLength int // d: Number of aged counters per bucket. + + // `math.Pow(Decay, i)` is the probability that a flow's counter with value `i` is decremented on collision. + Decay float32 + // Look-up table for powers of `Decay`. The value at `i` is `math.Pow(Decay, i)` + DecayLUT []float32 + + // Index of the next bucket to expire. + NextBucketToExpireIndex int + + Buckets []Bucket // Sketch counters. + Heap *heap.Min // Top-K min-heap. +} + +func New(k, windowSize int, opts ...Option) *Sketch { + log_k := int(math.Ceil(math.Log(float64(k)))) + + // default settings + out := Sketch{ + K: k, + Width: intMax(256, k*log_k), + Depth: intMax(3, log_k), + WindowSize: windowSize, + BucketHistoryLength: windowSize, + Decay: 0.9, + } + + for _, o := range opts { + o(&out) + } + + if len(out.DecayLUT) == 0 { + // if not specified, default to 256 + out.DecayLUT = make([]float32, 256) + } + + if out.BucketHistoryLength < 1 { + out.BucketHistoryLength = 1 + } + if out.BucketHistoryLength >= out.WindowSize { + out.BucketHistoryLength = out.WindowSize + } + + out.Heap = heap.NewMin(out.K) + out.initBuckets() + out.initDecayLUT() + + return &out +} + +func (me *Sketch) initDecayLUT() { + for i := range me.DecayLUT { + me.DecayLUT[i] = float32(math.Pow(float64(me.Decay), float64(i))) + } +} + +func (me *Sketch) initBuckets() { + me.Buckets = make([]Bucket, me.Width*me.Depth) + for i := range me.Buckets { + me.Buckets[i].Counts = make([]uint32, me.BucketHistoryLength) + } +} + +// SizeBytes returns the current size of the sketch in bytes. +func (me *Sketch) SizeBytes() int { + bucketsSize := (sizeofBucketStruct + sizeof.UInt32*me.BucketHistoryLength) * len(me.Buckets) + heapSize := me.Heap.SizeBytes() + decayTableSize := len(me.DecayLUT) * sizeof.Float32 + return sizeofSketchStruct + + bucketsSize + + heapSize + + decayTableSize +} + +// Tick advances time by one unit (of the N units in a window) +func (me *Sketch) Tick() { me.Ticks(1) } + +// Ticks advances time by n units (of the N units in a window) +func (me *Sketch) Ticks(n int) { + if n == 0 { + return + } + tick := me.NextBucketToExpireIndex + m, d, N := len(me.Buckets), me.BucketHistoryLength, me.WindowSize + bucketsToAge := (n * d * m) / N + if bucketsToAge < 1 { + bucketsToAge = 1 + } + for i := 0; i < bucketsToAge; i++ { + me.Buckets[tick].tick() + tick++ + if tick == m { + tick = 0 + } + } + me.NextBucketToExpireIndex = tick + me.recountHeapItems() +} + +// Count returns the estimated count of the given item. +func (me *Sketch) Count(item string) uint32 { + if i := me.Heap.Find(item); i >= 0 { + b := me.Heap.Items[i] + if b.Item == item { + return b.Count + } + } + + fingerprint := topk.Fingerprint(item) + var maxSum uint32 + + for i := range me.Depth { + b := &me.Buckets[topk.BucketIndex(item, i, me.Width)] + if b.Fingerprint != fingerprint { + continue + } + maxSum = uint32Max(maxSum, b.CountsSum) + } + + return maxSum +} + +func (me *Sketch) recountHeapItems() { + // recompute each heap item's count from its buckets, + // then re-initialize the heap. + // + // O(k * depth) + for i := range me.Heap.Items { + hb := &me.Heap.Items[i] + if hb.Count == 0 { + continue + } + fingerprint := hb.Fingerprint + item := hb.Item + width := me.Width + var maxSum uint32 + + for i := range me.Depth { + b := &me.Buckets[topk.BucketIndex(item, i, width)] + if b.Fingerprint != fingerprint { + continue + } + maxSum = uint32Max(maxSum, b.CountsSum) + } + hb.Count = maxSum + } + + // O(k) + me.Heap.Reinit() +} + +// Incr counts a single instance of the given item. +func (me *Sketch) Incr(item string) bool { + return me.Add(item, 1) +} + +// Add increments the given item's count by the given increment. +// Returns whether the item is in the top K. +func (me *Sketch) Add(item string, increment uint32) bool { + var maxCount uint32 + fingerprint := topk.Fingerprint(item) + + width := me.Width + for i := range me.Depth { + k := topk.BucketIndex(item, i, width) + b := &me.Buckets[k] + count := b.CountsSum + switch { + // empty bucket (zero count) + case count == 0: + b.Fingerprint = fingerprint + clear(b.Counts) + b.Counts[b.First] = increment + count = increment + + // this flow's bucket (equal fingerprint) + case b.Fingerprint == fingerprint: + b.Counts[b.First] += increment + count += increment + + // another flow's bucket (nonequal fingerprint) + default: + // can't be inlined, so not factored out + var decay float32 + lookupTableSize := uint32(len(me.DecayLUT)) + for incrementRemaining := increment; incrementRemaining > 0; incrementRemaining-- { + if count < lookupTableSize { + decay = me.DecayLUT[count] + } else { + decay = float32(math.Pow( + float64(me.DecayLUT[lookupTableSize-1]), + float64(count/(lookupTableSize-1)))) * me.DecayLUT[count%(lookupTableSize-1)] + } + if rand.Float32() < decay { + countsMinIdx := b.findNonzeroMinimumCount() + b.Counts[countsMinIdx]-- + count-- + if count == 0 { + b.Fingerprint = fingerprint + count = incrementRemaining + break + } + } + } + } + + b.CountsSum = count + maxCount = uint32Max(maxCount, count) + } + + return me.Heap.Update(item, fingerprint, maxCount) +} + +// Query returns whether the given item is in the top K items by count. +func (me *Sketch) Query(item string) bool { + return me.Heap.Contains(item) +} + +// Iter iterates over the top K items. +func (me *Sketch) Iter(yield func(*heap.Item) bool) { + for i := range me.Heap.Items { + if me.Heap.Items[i].Count == 0 { + continue + } + if !yield(&me.Heap.Items[i]) { + break + } + } +} + +// SortedSlice returns the top K items as a sorted slice. +func (me *Sketch) SortedSlice() []heap.Item { + out := slices.Clone(me.Heap.Items) + + sort.SliceStable(out, func(i, j int) bool { + ci, cj := out[i].Count, out[j].Count + if ci == cj { + return out[i].Item < out[j].Item + } + return ci > cj + }) + + end := len(out) + for ; end > 0; end-- { + if out[end-1].Count > 0 { + break + } + } + + return out[:end] +} + +// Reset resets the sketch to an empty state. +func (me *Sketch) Reset() { + me.NextBucketToExpireIndex = 0 + for i := range me.Buckets { + me.Buckets[i].CountsSum = 0 + me.Buckets[i].Fingerprint = 0 + clear(me.Buckets[i].Counts) + } + clear(me.Buckets) + clear(me.Heap.Items) + clear(me.Heap.Index) +} + +func uint32Max(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func intMax(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/sliding/sketch_test.go b/sliding/sketch_test.go new file mode 100644 index 0000000..dd38a9b --- /dev/null +++ b/sliding/sketch_test.go @@ -0,0 +1,141 @@ +package sliding_test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/keilerkonzept/topk" + "github.com/keilerkonzept/topk/heap" + "github.com/keilerkonzept/topk/sliding" +) + +func TestSketch(t *testing.T) { + sketch := sliding.New(2, 2, sliding.WithWidth(10), sliding.WithDepth(2), sliding.WithBucketHistoryLength(2)) + + //t 0 + // + //X 3 + //Y 2 + //Z 1 + // [ _ _ ] {x:3,y:2}+ + sketch.Add("X", 3) + sketch.Add("Y", 2) + sketch.Add("Z", 1) + { + expected := []heap.Item{ + {topk.Fingerprint("X"), "X", 3}, + {topk.Fingerprint("Y"), "Y", 2}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 + // + //X 3 2 + //Y 2 2 + //Z 1 1 + // [ _ _ ] {x:5,y:4} + sketch.Add("X", 2) + sketch.Add("Y", 2) + sketch.Add("Z", 1) + { + expected := []heap.Item{ + {topk.Fingerprint("X"), "X", 5}, + {topk.Fingerprint("Y"), "Y", 4}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 2 + // + //X 3 2 0 + //Y 2 2 1 + //Z 1 1 3 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + sketch.Add("Y", 1) + sketch.Add("Z", 3) + { + expected := []heap.Item{ + {topk.Fingerprint("Z"), "Z", 4}, + {topk.Fingerprint("Y"), "Y", 3}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 2 3 + // + //X 3 2 0 0 + //Y 2 2 1 1 + //Z 1 1 3 3 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + sketch.Add("Y", 1) + sketch.Add("Z", 3) + { + expected := []heap.Item{ + {topk.Fingerprint("Z"), "Z", 6}, + {topk.Fingerprint("Y"), "Y", 2}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + + sketch.Tick() + //t 0 1 2 3 4 + // + //X 3 2 0 0 0 + //Y 2 2 1 1 0 + //Z 1 1 3 3 0 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + // [ _ _ ] {z:3:y:1} + { + expected := []heap.Item{ + {topk.Fingerprint("Z"), "Z", 3}, + {topk.Fingerprint("Y"), "Y", 1}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + + sketch.Tick() + sketch.Add("X", 1) + //t 0 1 2 3 4 5 + // + //X 3 2 0 0 0 1 + //Y 2 2 1 1 0 0 + //Z 1 1 3 3 0 0 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + // [ _ _ ] {z:3:y:1} + // [ _ _ ] {x:1} + { + expected := []heap.Item{ + {topk.Fingerprint("X"), "X", 1}, + } + actual := sketch.SortedSlice() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } +}