Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
sgreben committed Aug 29, 2024
0 parents commit 159f8f0
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# sliding-topk

Sliding HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf)

```go
import (
topk "github.com/keilerkonzept/sliding-topk"
)

func main() {
// make a new sketch keeping track of k=3 items over a window of the last 60 ticks
// use width=1024 x depth=3 = 3072 buckets
sketch := topk.New(3, 60, topk.WithWidth(1024),topk.WithDepth(3))

log.Println("the sketch takes", sketch.SizeBytes(), "bytes in memory")

sketch.Incr("an item") // count "an item" 1 time
sketch.Add("an item", 123) // count "an item" 123 times
sketch.Tick(1) // advance time by one tick
sketch.Add("another item", 4) // count "another item" 4 times
sketch.Tick(2) // advance time by two ticks
sketch.Add("an item", 5) // count "an item" 5 more times
sketch.Add("yet another item", 6) // count "yet another item" 6 times

if sketch.Query("an item") {
// "an item" is in the top K items observed within the last 60 ticks
}

_ = sketch.Count("another item") // return the estimated count for "another item"

for entry := range sketch.TopK() {// TopK() rseturn all top K items as a slice of {Item,Count} structs
log.Println(entry.Item, "counted", entry.Count, "times")
}

sketch.Reset() // reset to New() state
}
64 changes: 64 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package topk

import (
"unsafe"
)

const (
sizeOfBucketStruct = int(unsafe.Sizeof(Bucket{}))
)

type Bucket struct {
Fingerprint uint32

// Counts is a circular buffer (with its first entry at .First)
Counts []uint32
First uint32
// CountsSum is the current sum of Counts
CountsSum uint32
}

func (me *Bucket) last() uint32 {
n := uint32(len(me.Counts))
return ((me.First + n) - 1) % n
}

func (me *Bucket) tick() {
if me.CountsSum == 0 {
return
}

last := me.First
if last == 0 {
last = uint32(len(me.Counts) - 1)
} else {
last = uint32(last - 1)
}
me.CountsSum -= me.Counts[last]
me.Counts[last] = 0
me.First = last
}

func (me *Bucket) findNonzeroMinimumCount() int {
countsMinIdx := uint32(0)
first := true
var countsMin uint32
i := me.First
for range len(me.Counts) {
if i == uint32(len(me.Counts)) {
i = 0
}
c := me.Counts[i]
if c == 0 {
i++
continue
}
if first || c < countsMin {
countsMin = c
countsMinIdx = i
first = false
}
i++
}
return int(countsMinIdx)
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/keilerkonzept/sliding-topk

go 1.23.0

require github.com/google/go-cmp v0.6.0

require github.com/OneOfOne/xxhash v1.2.8
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
16 changes: 16 additions & 0 deletions internal/unsafeutil/unsafeutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package unsafeutil

import "unsafe"

// Bytes converts a string into a byte slice.
func Bytes(s string) (b []byte) {
return unsafe.Slice(unsafe.StringData(s), len(s))
}

// String converts a byte slice into a string.
func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
return unsafe.String(&b[0], len(b))
}
135 changes: 135 additions & 0 deletions min_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package topk

import (
"container/heap"
"unsafe"
)

type HeapItem struct {
Fingerprint uint32
Item string
Count uint32
}

type MinHeap struct {
Items []HeapItem
Index map[string]int
StoredKeysBytes int
}

func NewMinHeap(k int) *MinHeap {
return &MinHeap{
Items: make([]HeapItem, k),
Index: make(map[string]int, k),
}
}

var _ heap.Interface = &MinHeap{}

const (
sizeOfBucketMinHeapStruct = int(unsafe.Sizeof(MinHeap{}))
sizeOfHeapBucket = int(unsafe.Sizeof(HeapItem{}))
sizeOfIndex = int(unsafe.Sizeof(map[string]int{}))
)

func (h MinHeap) SizeBytes() int {
structSize := sizeOfBucketMinHeapStruct
bucketsSize := len(h.Items)*sizeOfHeapBucket + h.StoredKeysBytes
indexSize := sizeOfIndex + (sizeofInt+sizeofString)*len(h.Index)
return structSize + bucketsSize + indexSize
}

func (h MinHeap) Full() bool { return len(h.Items) == cap(h.Items) }
func (h MinHeap) Len() int { return len(h.Items) }
func (h MinHeap) Less(i, j int) bool {
ic := h.Items[i].Count
jc := h.Items[j].Count
if ic == jc {
return h.Items[i].Item < h.Items[j].Item
}
return ic < jc
}
func (h MinHeap) Swap(i, j int) {
itemi := h.Items[i].Item
itemj := h.Items[j].Item
h.Items[i], h.Items[j] = h.Items[j], h.Items[i]
h.Index[itemi] = j
h.Index[itemj] = i
}

func (h *MinHeap) Push(x interface{}) {
b := x.(HeapItem)
h.Items = append(h.Items, b)
h.Index[b.Item] = len(h.Items) - 1
}

func (h *MinHeap) Pop() interface{} {
old := h.Items
n := len(old)
x := old[n-1]
h.Items = old[0 : n-1]
delete(h.Index, x.Item)
return x
}

// Min returns the minimum count in the heap or 0 if the heap is empty.
func (h MinHeap) Min() uint32 {
if len(h.Items) == 0 {
return 0
}
return h.Items[0].Count
}

func (h MinHeap) Find(item string) (i int) {
if i, ok := h.Index[item]; ok {
return i
}
return -1
}

func (h MinHeap) Contains(item string) bool {
_, ok := h.Index[item]
return ok
}

func (h MinHeap) Get(item string) *HeapItem {
if i, ok := h.Index[item]; ok {
return &h.Items[i]
}
return nil
}

func (h *MinHeap) Update(item string, fingerprint uint32, count uint32) {
if count < h.Min() && h.Full() { // not in top k: ignore
return
}

if i := h.Find(item); i >= 0 { // already in heap: update count
h.Items[i].Count = count
heap.Fix(h, i)
return
}

h.StoredKeysBytes += len(item)

if !h.Full() { // heap not full: add to heap
h.Push(HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
})
return
}

// replace min on heap
minItem := h.Items[0].Item
h.StoredKeysBytes -= len(minItem)
delete(h.Index, minItem)
h.Items[0] = HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
}
h.Index[item] = 0
heap.Fix(h, 0)
}
10 changes: 10 additions & 0 deletions sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package topk

import "unsafe"

const (
sizeofString = int(unsafe.Sizeof(""))
sizeofInt = int(unsafe.Sizeof(int(0)))
sizeofUInt32 = int(unsafe.Sizeof(uint32(0)))
sizeofFloat32 = int(unsafe.Sizeof(float32(0)))
)
Loading

0 comments on commit 159f8f0

Please sign in to comment.