diff --git a/internal/eviction/s3fifo/queue.go b/internal/eviction/s3fifo/queue.go index 0b8c62e..60e4a7c 100644 --- a/internal/eviction/s3fifo/queue.go +++ b/internal/eviction/s3fifo/queue.go @@ -4,20 +4,22 @@ import ( "runtime" "sync/atomic" "unsafe" + + "github.com/maypok86/otter/internal/xruntime" ) type queue[T any] struct { capacity uint64 head atomic.Uint64 - headPadding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte tail atomic.Uint64 - tailPadding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte slots []paddedSlot[T] } type paddedSlot[T any] struct { slot[T] - padding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + padding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte } type slot[I any] struct { diff --git a/internal/eviction/s3fifo/runtime.go b/internal/eviction/s3fifo/runtime.go deleted file mode 100644 index da8f7d3..0000000 --- a/internal/eviction/s3fifo/runtime.go +++ /dev/null @@ -1,6 +0,0 @@ -package s3fifo - -const ( - // useful for preventing false sharing. - cacheLineSize = 64 -) diff --git a/internal/shard/shard.go b/internal/shard/shard.go index fdab89e..a9448e8 100644 --- a/internal/shard/shard.go +++ b/internal/shard/shard.go @@ -119,7 +119,7 @@ func newTable(bucketCount int) *table { func (s *Shard[K, V]) Get(key K) (V, *node.Node[K, V], bool) { t := (*table)(atomic.LoadPointer(&s.table)) hash := s.calcShiftHash(key) - bucketIdx := t.mask & hash + bucketIdx := hash & t.mask b := &t.buckets[bucketIdx] for { for i := 0; i < bucketSize; i++ { diff --git a/internal/stats/counter.go b/internal/stats/counter.go index 85a921c..1cee5b0 100644 --- a/internal/stats/counter.go +++ b/internal/stats/counter.go @@ -3,13 +3,16 @@ package stats import ( "sync" "sync/atomic" + + "github.com/maypok86/otter/internal/xmath" + "github.com/maypok86/otter/internal/xruntime" ) var tokenPool sync.Pool type token struct { idx uint32 - padding [cacheLineSize - 4]byte + padding [xruntime.CacheLineSize - 4]byte } // much faster than atomic in write heavy scenarios (for example stats). @@ -20,11 +23,11 @@ type counter struct { type cshard struct { c int64 - padding [cacheLineSize - 8]byte + padding [xruntime.CacheLineSize - 8]byte } func newCounter() *counter { - nshards := roundUpPowerOf2(parallelism()) + nshards := xmath.RoundUpPowerOf2(xruntime.Parallelism()) return &counter{ shards: make([]cshard, nshards), mask: nshards - 1, @@ -43,7 +46,7 @@ func (c *counter) add(delta int64) { t, ok := tokenPool.Get().(*token) if !ok { t = &token{} - t.idx = fastrand() + t.idx = xruntime.Fastrand() } for { shard := &c.shards[t.idx&c.mask] @@ -51,7 +54,7 @@ func (c *counter) add(delta int64) { if atomic.CompareAndSwapInt64(&shard.c, cnt, cnt+delta) { break } - t.idx = fastrand() + t.idx = xruntime.Fastrand() } tokenPool.Put(t) } diff --git a/internal/stats/runtime.go b/internal/stats/runtime.go deleted file mode 100644 index d94d3af..0000000 --- a/internal/stats/runtime.go +++ /dev/null @@ -1,40 +0,0 @@ -package stats - -import ( - "runtime" - // this is used for fastrand function. - _ "unsafe" -) - -const ( - // useful for preventing false sharing. - cacheLineSize = 64 -) - -// based on https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2. -func roundUpPowerOf2(v uint32) uint32 { - if v == 0 { - return 1 - } - v-- - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - v++ - return v -} - -func parallelism() uint32 { - maxProcs := uint32(runtime.GOMAXPROCS(0)) - numCPU := uint32(runtime.NumCPU()) - if maxProcs < numCPU { - return maxProcs - } - return numCPU -} - -//go:noescape -//go:linkname fastrand runtime.fastrand -func fastrand() uint32 diff --git a/internal/unixtime/unixtime.go b/internal/unixtime/unixtime.go index a52022b..a09d27c 100644 --- a/internal/unixtime/unixtime.go +++ b/internal/unixtime/unixtime.go @@ -1,6 +1,7 @@ package unixtime import ( + "sync" "sync/atomic" "time" ) @@ -9,18 +10,54 @@ import ( // and we don't need a more precise time for the expiry time (and most other operations). var now uint64 -func init() { - now = uint64(time.Now().Unix()) +var ( + mutex sync.Mutex + countInstance int + done chan struct{} +) + +func startTimer() { + done = make(chan struct{}) + atomic.StoreUint64(&now, uint64(time.Now().Unix())) go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - for t := range ticker.C { - atomic.StoreUint64(&now, uint64(t.Unix())) + for { + select { + case t := <-ticker.C: + atomic.StoreUint64(&now, uint64(t.Unix())) + case <-done: + return + } } }() } +// Start should be called when the cache instance is created to initialize the timer. +func Start() { + mutex.Lock() + defer mutex.Unlock() + + if countInstance == 0 { + startTimer() + } + + countInstance++ +} + +// Stop should be called when closing and stopping the cache instance to stop the timer. +func Stop() { + mutex.Lock() + defer mutex.Unlock() + + countInstance-- + if countInstance == 0 { + done <- struct{}{} + close(done) + } +} + // Now returns time as a Unix time, the number of seconds elapsed since January 1, 1970 UTC. func Now() uint64 { return atomic.LoadUint64(&now) diff --git a/internal/unixtime/unixtime_bench_test.go b/internal/unixtime/unixtime_bench_test.go index 6ba26e2..5314212 100644 --- a/internal/unixtime/unixtime_bench_test.go +++ b/internal/unixtime/unixtime_bench_test.go @@ -7,6 +7,8 @@ import ( ) func BenchmarkNow(b *testing.B) { + Start() + b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { var ts uint64 @@ -15,6 +17,8 @@ func BenchmarkNow(b *testing.B) { } atomic.StoreUint64(&sink, ts) }) + + Stop() } func BenchmarkTimeNowUnix(b *testing.B) { diff --git a/internal/unixtime/unixtime_test.go b/internal/unixtime/unixtime_test.go index 88c6edc..c8abf17 100644 --- a/internal/unixtime/unixtime_test.go +++ b/internal/unixtime/unixtime_test.go @@ -6,9 +6,27 @@ import ( ) func TestNow(t *testing.T) { + Start() + expected := time.Now().Unix() got := Now() if uint64(expected) != got { t.Fatalf("unexpected unix time; got %d; want %d", got, expected) } + + time.Sleep(3 * time.Second) + + expected = time.Now().Unix() + got = Now() + if uint64(expected)-got > 1 { + t.Fatalf("unexpected unix time; got %d; want %d", got, expected) + } + + Stop() + + time.Sleep(3 * time.Second) + + if Now()-got > 1 { + t.Fatal("timer should have stopped") + } } diff --git a/internal/xruntime/runtime.go b/internal/xruntime/runtime.go index 6896066..87b3c74 100644 --- a/internal/xruntime/runtime.go +++ b/internal/xruntime/runtime.go @@ -1,6 +1,7 @@ package xruntime import ( + "runtime" _ "unsafe" ) @@ -9,6 +10,15 @@ const ( CacheLineSize = 64 ) +func Parallelism() uint32 { + maxProcs := uint32(runtime.GOMAXPROCS(0)) + numCPU := uint32(runtime.NumCPU()) + if maxProcs < numCPU { + return maxProcs + } + return numCPU +} + //go:noescape //go:linkname Fastrand runtime.fastrand func Fastrand() uint32