Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
sgreben committed Aug 30, 2024
0 parents commit d9fb7f0
Show file tree
Hide file tree
Showing 17 changed files with 1,243 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
107 changes: 107 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# topk

Sliding-window and regular top-K sketches.

- A fast implementation of the HeavyKeeper top-K sketch based on the [RedisBloom implementation](https://github.com/RedisBloom/RedisBloom/blob/b5916e1b9fba17829c3e329c127b99d706eb31f6/src/sliding.c)
- A sliding-window top-K sketch, also based on HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf)

```go
import (
"github.com/keilerkonzept/topk" // plain sketch
"github.com/keilerkonzept/topk/sliding" // sliding-window sketch
)
```

**Contents**

- [Examples](#examples)
- [Plain Top-K Sketch](#plain-top-k-sketch)
- [Sliding-window Top-K Sketch](#sliding-window-top-k-sketch)

## Examples

### Plain Top-K Sketch

```go
package main

import (
"log"
"github.com/keilerkonzept/topk"
)

func main() {
// make a new sketch keeping track of k=3 items using 1024x3 = 3072 buckets.
sketch := topk.New(3, topk.WithWidth(1024), topk.WithDepth(3))

log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory")

sketch.Incr("an item") // count "an item" 1 time
sketch.Add("an item", 123) // count "an item" 123 times
sketch.Add("another item", 4) // count "another item" 4 times
sketch.Add("an item", 5) // count "an item" 5 more times
sketch.Add("yet another item", 6) // count "yet another item" 6 times

if sketch.Query("an item") {
// "an item" is in the top K items observed within the last 60 ticks
}

_ = sketch.Count("another item") // return the estimated count for "another item"

// SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs.
for _, entry := range sketch.SortedSlice() {
log.Println(entry.Item, "has been counted", entry.Count, "times")
}

// Iter is an interator over the (*not* sorted) current top-K entries.
for entry := range sketch.Iter {
log.Println(entry.Item, "has been counted", entry.Count, "times")
}
sketch.Reset() // reset to New() state
}
```


### Sliding-window Top-K Sketch

```go
package main

import (
"log"
"github.com/keilerkonzept/topk/sliding"
)

func main() {
// make a new sketch keeping track of k=3 items over a window of the last 60 ticks
// use width=1024 x depth=3 = 3072 buckets
sketch := sliding.New(3, 60, sliding.WithWidth(1024), sliding.WithDepth(3))

log.Println("the sketch takes up", sketch.SizeBytes(), "bytes in memory")

sketch.Incr("an item") // count "an item" 1 time
sketch.Add("an item", 123) // count "an item" 123 times
sketch.Tick() // advance time by one tick
sketch.Add("another item", 4) // count "another item" 4 times
sketch.Ticks(2) // advance time by two ticks
sketch.Add("an item", 5) // count "an item" 5 more times
sketch.Add("yet another item", 6) // count "yet another item" 6 times

if sketch.Query("an item") {
// "an item" is in the top K items observed within the last 60 ticks
}

_ = sketch.Count("another item") // return the estimated count for "another item"

// SortedSlice() returns the current top-K entries as a slice of {Fingerprint,Item,Count} structs.
for _, entry := range sketch.SortedSlice() {
log.Println(entry.Item, "has been counted", entry.Count, "times")
}

// Iter is an interator over the (*not* sorted) current top-K entries.
for entry := range sketch.Iter {
log.Println(entry.Item, "has been counted", entry.Count, "times")
}
sketch.Reset() // reset to New() state
}
```
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/keilerkonzept/sliding-topk

go 1.23.0

require github.com/google/go-cmp v0.6.0

require github.com/OneOfOne/xxhash v1.2.8
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
14 changes: 14 additions & 0 deletions hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package topk

import "github.com/OneOfOne/xxhash"

const hashSeed = 4848280

func Fingerprint(item string) uint32 {
return xxhash.ChecksumString32S(item, hashSeed)
}

func BucketIndex(item string, row, width int) int {
column := int(xxhash.ChecksumString32S(item, uint32(row))) % width
return row*width + column
}
148 changes: 148 additions & 0 deletions heap/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package heap

import (
"container/heap"

"github.com/keilerkonzept/topk/internal/sizeof"
)

type Item struct {
Fingerprint uint32
Item string
Count uint32
}

type Min struct {
Items []Item
Index map[string]int
StoredKeysBytes int
}

func NewMin(k int) *Min {
return &Min{
Items: make([]Item, k),
Index: make(map[string]int, k),
}
}

var _ heap.Interface = &Min{}

func (me Min) SizeBytes() int {
structSize := sizeofMinStruct
bucketsSize := len(me.Items)*sizeofItem + me.StoredKeysBytes
indexSize := sizeof.StringIntMap + (sizeof.Int+sizeof.String)*len(me.Index)
return structSize + bucketsSize + indexSize
}

func (me *Min) Reinit() {
heap.Init(me)
for me.Len() > 0 && me.Items[0].Count == 0 {
item := me.Items[0].Item
heap.Pop(me)
delete(me.Index, item)
}
}

func (me Min) Full() bool { return len(me.Items) == cap(me.Items) }

// Len is container/heap.Interface.Len().
func (me Min) Len() int { return len(me.Items) }

// Less is container/heap.Interface.Less().
func (me Min) Less(i, j int) bool {
ic := me.Items[i].Count
jc := me.Items[j].Count
if ic == jc {
return me.Items[i].Item < me.Items[j].Item
}
return ic < jc
}

// Swap is container/heap.Interface.Swap().
func (me Min) Swap(i, j int) {
itemi := me.Items[i].Item
itemj := me.Items[j].Item
me.Items[i], me.Items[j] = me.Items[j], me.Items[i]
me.Index[itemi] = j
me.Index[itemj] = i
}

// Push is container/heap.Interface.Push().
func (me *Min) Push(x interface{}) {
b := x.(Item)
me.Items = append(me.Items, b)
me.Index[b.Item] = len(me.Items) - 1
}

// Pop is container/heap.Interface.Pop().
func (me *Min) Pop() interface{} {
old := me.Items
n := len(old)
x := old[n-1]
me.Items = old[0 : n-1]
delete(me.Index, x.Item)
return x
}

// Min returns the minimum count in the heap or 0 if the heap is empty.
func (me Min) Min() uint32 {
if len(me.Items) == 0 {
return 0
}
return me.Items[0].Count
}

func (me Min) Find(item string) (i int) {
if i, ok := me.Index[item]; ok {
return i
}
return -1
}

func (me Min) Contains(item string) bool {
_, ok := me.Index[item]
return ok
}

func (me Min) Get(item string) *Item {
if i, ok := me.Index[item]; ok {
return &me.Items[i]
}
return nil
}

func (me *Min) Update(item string, fingerprint uint32, count uint32) bool {
if count < me.Min() && me.Full() { // not in top k: ignore
return false
}

if i := me.Find(item); i >= 0 { // already in heap: update count
me.Items[i].Count = count
heap.Fix(me, i)
return true
}

me.StoredKeysBytes += len(item)

if !me.Full() { // heap not full: add to heap
me.Push(Item{
Count: count,
Fingerprint: fingerprint,
Item: item,
})
return true
}

// replace min on heap
minItem := me.Items[0].Item
me.StoredKeysBytes -= len(minItem)
delete(me.Index, minItem)
me.Items[0] = Item{
Count: count,
Fingerprint: fingerprint,
Item: item,
}
me.Index[item] = 0
heap.Fix(me, 0)
return true
}
10 changes: 10 additions & 0 deletions heap/sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package heap

import (
"unsafe"
)

const (
sizeofMinStruct = int(unsafe.Sizeof(Min{}))
sizeofItem = int(unsafe.Sizeof(Item{}))
)
11 changes: 11 additions & 0 deletions internal/sizeof/sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sizeof

import "unsafe"

const (
StringIntMap = int(unsafe.Sizeof(map[string]int{}))
String = int(unsafe.Sizeof(""))
Int = int(unsafe.Sizeof(int(0)))
UInt32 = int(unsafe.Sizeof(uint32(0)))
Float32 = int(unsafe.Sizeof(float32(0)))
)
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package topk

type Option func(*Sketch)

func WithDepth(depth int) Option { return func(s *Sketch) { s.Depth = depth } }

func WithWidth(width int) Option { return func(s *Sketch) { s.Width = width } }

func WithDecay(decay float32) Option { return func(s *Sketch) { s.Decay = decay } }

func WithDecayLookupTableSize(n int) Option {
return func(s *Sketch) { s.DecayLUT = make([]float32, n) }
}
10 changes: 10 additions & 0 deletions sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package topk

import (
"unsafe"
)

const (
sizeofSketchStruct = int(unsafe.Sizeof(Sketch{}))
sizeofBucketStruct = int(unsafe.Sizeof(Bucket{}))
)
Loading

0 comments on commit d9fb7f0

Please sign in to comment.