-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 95acb38
Showing
17 changed files
with
1,098 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# This workflow will build a golang project | ||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go | ||
|
||
name: Go | ||
|
||
on: | ||
push: | ||
branches: [ "main" ] | ||
pull_request: | ||
branches: [ "main" ] | ||
|
||
jobs: | ||
|
||
build: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v4 | ||
|
||
- name: Set up Go | ||
uses: actions/setup-go@v4 | ||
with: | ||
go-version: '1.23' | ||
|
||
- name: Build | ||
run: go build -v ./... | ||
|
||
- name: Test | ||
run: go test -v ./... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# topk | ||
|
||
Sliding-window and regular top-K sketches. | ||
|
||
- A fast implementation of the HeavyKeeper top-K sketch based on the [RedisBloom implementation](https://github.com/RedisBloom/RedisBloom/blob/b5916e1b9fba17829c3e329c127b99d706eb31f6/src/topk.c) | ||
- A sliding-window top-K sketch, also based on HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf) | ||
|
||
```go | ||
import ( | ||
"github.com/keilerkonzept/topk" // plain sketch | ||
"github.com/keilerkonzept/topk/sliding" // sliding-window sketch | ||
) | ||
``` | ||
|
||
**Contents** | ||
|
||
- [Examples](#examples) | ||
- [Plain Top-K Sketch](#plain-top-k-sketch) | ||
- [Sliding-window Top-K Sketch](#sliding-window-top-k-sketch) | ||
|
||
## Examples | ||
|
||
### Plain Top-K Sketch | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"log" | ||
"github.com/keilerkonzept/topk" | ||
) | ||
|
||
func main() { | ||
// make a new sketch keeping track of k=3 items using 1024x3 = 3072 buckets. | ||
sketch := topk.New(3, topk.WithWidth(1024), topk.WithDepth(3)) | ||
|
||
log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory") | ||
|
||
sketch.Incr("an item") // count "an item" 1 time | ||
sketch.Add("an item", 123) // count "an item" 123 times | ||
sketch.Add("another item", 4) // count "another item" 4 times | ||
sketch.Add("an item", 5) // count "an item" 5 more times | ||
sketch.Add("yet another item", 6) // count "yet another item" 6 times | ||
|
||
if sketch.Query("an item") { | ||
// "an item" is in the top K items observed within the last 60 ticks | ||
} | ||
|
||
_ = sketch.Count("another item") // return the estimated count for "another item" | ||
|
||
// SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs. | ||
for _, entry := range sketch.SortedSlice() { | ||
log.Println(entry.Item, "has been counted", entry.Count, "times") | ||
} | ||
|
||
// Iter is an interator over the (*not* sorted) current top-K entries. | ||
for entry := range sketch.Iter { | ||
log.Println(entry.Item, "has been counted", entry.Count, "times") | ||
} | ||
sketch.Reset() // reset to New() state | ||
} | ||
``` | ||
|
||
|
||
### Sliding-window Top-K Sketch | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"log" | ||
"github.com/keilerkonzept/topk/sliding" | ||
) | ||
|
||
func main() { | ||
// make a new sketch keeping track of k=3 items over a window of the last 60 ticks | ||
// use width=1024 x depth=3 = 3072 buckets | ||
sketch := sliding.New(3, 60, sliding.WithWidth(1024), sliding.WithDepth(3)) | ||
|
||
log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory") | ||
|
||
sketch.Incr("an item") // count "an item" 1 time | ||
sketch.Add("an item", 123) // count "an item" 123 times | ||
sketch.Tick() // advance time by one tick | ||
sketch.Add("another item", 4) // count "another item" 4 times | ||
sketch.Ticks(2) // advance time by two ticks | ||
sketch.Add("an item", 5) // count "an item" 5 more times | ||
sketch.Add("yet another item", 6) // count "yet another item" 6 times | ||
|
||
if sketch.Query("an item") { | ||
// "an item" is in the top K items observed within the last 60 ticks | ||
} | ||
|
||
_ = sketch.Count("another item") // return the estimated count for "another item" | ||
|
||
// SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs. | ||
for _, entry := range sketch.SortedSlice() { | ||
log.Println(entry.Item, "has been counted", entry.Count, "times") | ||
} | ||
|
||
// Iter is an interator over the (*not* sorted) current top-K entries. | ||
for entry := range sketch.Iter { | ||
log.Println(entry.Item, "has been counted", entry.Count, "times") | ||
} | ||
sketch.Reset() // reset to New() state | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
module github.com/keilerkonzept/sliding-topk | ||
|
||
go 1.23.0 | ||
|
||
require github.com/google/go-cmp v0.6.0 | ||
|
||
require github.com/OneOfOne/xxhash v1.2.8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= | ||
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= | ||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package topk | ||
|
||
import "github.com/OneOfOne/xxhash" | ||
|
||
const hashSeed = 4848280 | ||
|
||
func Fingerprint(item string) uint32 { | ||
return xxhash.ChecksumString32S(item, hashSeed) | ||
} | ||
|
||
func BucketIndex(item string, row, width int) int { | ||
column := int(xxhash.ChecksumString32S(item, uint32(row))) % width | ||
return row*width + column | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package heap | ||
|
||
import ( | ||
"container/heap" | ||
|
||
"github.com/keilerkonzept/topk/internal/sizeof" | ||
) | ||
|
||
type Item struct { | ||
Fingerprint uint32 | ||
Item string | ||
Count uint32 | ||
} | ||
|
||
type Min struct { | ||
Items []Item | ||
Index map[string]int | ||
StoredKeysBytes int | ||
} | ||
|
||
func NewMin(k int) *Min { | ||
return &Min{ | ||
Items: make([]Item, k), | ||
Index: make(map[string]int, k), | ||
} | ||
} | ||
|
||
var _ heap.Interface = &Min{} | ||
|
||
func (me Min) SizeBytes() int { | ||
structSize := sizeofMinStruct | ||
bucketsSize := len(me.Items)*sizeofItem + me.StoredKeysBytes | ||
indexSize := sizeof.StringIntMap + (sizeof.Int+sizeof.String)*len(me.Index) | ||
return structSize + bucketsSize + indexSize | ||
} | ||
|
||
func (me *Min) Reinit() { | ||
heap.Init(me) | ||
for me.Len() > 0 && me.Items[0].Count == 0 { | ||
item := me.Items[0].Item | ||
heap.Pop(me) | ||
delete(me.Index, item) | ||
} | ||
} | ||
|
||
func (me Min) Full() bool { return len(me.Items) == cap(me.Items) } | ||
|
||
// Len is container/heap.Interface.Len(). | ||
func (me Min) Len() int { return len(me.Items) } | ||
|
||
// Less is container/heap.Interface.Less(). | ||
func (me Min) Less(i, j int) bool { | ||
ic := me.Items[i].Count | ||
jc := me.Items[j].Count | ||
if ic == jc { | ||
return me.Items[i].Item < me.Items[j].Item | ||
} | ||
return ic < jc | ||
} | ||
|
||
// Swap is container/heap.Interface.Swap(). | ||
func (me Min) Swap(i, j int) { | ||
itemi := me.Items[i].Item | ||
itemj := me.Items[j].Item | ||
me.Items[i], me.Items[j] = me.Items[j], me.Items[i] | ||
me.Index[itemi] = j | ||
me.Index[itemj] = i | ||
} | ||
|
||
// Push is container/heap.Interface.Push(). | ||
func (me *Min) Push(x interface{}) { | ||
b := x.(Item) | ||
me.Items = append(me.Items, b) | ||
me.Index[b.Item] = len(me.Items) - 1 | ||
} | ||
|
||
// Pop is container/heap.Interface.Pop(). | ||
func (me *Min) Pop() interface{} { | ||
old := me.Items | ||
n := len(old) | ||
x := old[n-1] | ||
me.Items = old[0 : n-1] | ||
delete(me.Index, x.Item) | ||
return x | ||
} | ||
|
||
// Min returns the minimum count in the heap or 0 if the heap is empty. | ||
func (me Min) Min() uint32 { | ||
if len(me.Items) == 0 { | ||
return 0 | ||
} | ||
return me.Items[0].Count | ||
} | ||
|
||
func (me Min) Find(item string) (i int) { | ||
if i, ok := me.Index[item]; ok { | ||
return i | ||
} | ||
return -1 | ||
} | ||
|
||
func (me Min) Contains(item string) bool { | ||
_, ok := me.Index[item] | ||
return ok | ||
} | ||
|
||
func (me Min) Get(item string) *Item { | ||
if i, ok := me.Index[item]; ok { | ||
return &me.Items[i] | ||
} | ||
return nil | ||
} | ||
|
||
func (me *Min) Update(item string, fingerprint uint32, count uint32) bool { | ||
if count < me.Min() && me.Full() { // not in top k: ignore | ||
return false | ||
} | ||
|
||
if i := me.Find(item); i >= 0 { // already in heap: update count | ||
me.Items[i].Count = count | ||
heap.Fix(me, i) | ||
return true | ||
} | ||
|
||
me.StoredKeysBytes += len(item) | ||
|
||
if !me.Full() { // heap not full: add to heap | ||
me.Push(Item{ | ||
Count: count, | ||
Fingerprint: fingerprint, | ||
Item: item, | ||
}) | ||
return true | ||
} | ||
|
||
// replace min on heap | ||
minItem := me.Items[0].Item | ||
me.StoredKeysBytes -= len(minItem) | ||
delete(me.Index, minItem) | ||
me.Items[0] = Item{ | ||
Count: count, | ||
Fingerprint: fingerprint, | ||
Item: item, | ||
} | ||
me.Index[item] = 0 | ||
heap.Fix(me, 0) | ||
return true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package heap | ||
|
||
import ( | ||
"unsafe" | ||
) | ||
|
||
const ( | ||
sizeofMinStruct = int(unsafe.Sizeof(Min{})) | ||
sizeofItem = int(unsafe.Sizeof(Item{})) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package sizeof | ||
|
||
import "unsafe" | ||
|
||
const ( | ||
StringIntMap = int(unsafe.Sizeof(map[string]int{})) | ||
String = int(unsafe.Sizeof("")) | ||
Int = int(unsafe.Sizeof(int(0))) | ||
UInt32 = int(unsafe.Sizeof(uint32(0))) | ||
Float32 = int(unsafe.Sizeof(float32(0))) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package topk | ||
|
||
type Option func(*Sketch) | ||
|
||
func WithDepth(depth int) Option { return func(s *Sketch) { s.Depth = depth } } | ||
|
||
func WithWidth(width int) Option { return func(s *Sketch) { s.Width = width } } | ||
|
||
func WithDecay(decay float32) Option { return func(s *Sketch) { s.Decay = decay } } | ||
|
||
func WithDecayLookupTableSize(n int) Option { | ||
return func(s *Sketch) { s.DecayLUT = make([]float32, n) } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package topk | ||
|
||
import ( | ||
"unsafe" | ||
) | ||
|
||
const ( | ||
sizeofSketchStruct = int(unsafe.Sizeof(Sketch{})) | ||
sizeofBucketStruct = int(unsafe.Sizeof(Bucket{})) | ||
) |
Oops, something went wrong.