Skip to content

Commit

Permalink
[Chore] Refactor project
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 24, 2023
1 parent 3aa2b08 commit fe9884f
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 59 deletions.
8 changes: 5 additions & 3 deletions internal/eviction/s3fifo/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"runtime"
"sync/atomic"
"unsafe"

"github.com/maypok86/otter/internal/xruntime"
)

type queue[T any] struct {
capacity uint64
head atomic.Uint64
headPadding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tail atomic.Uint64
tailPadding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
slots []paddedSlot[T]
}

type paddedSlot[T any] struct {
slot[T]
padding [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
padding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
}

type slot[I any] struct {
Expand Down
6 changes: 0 additions & 6 deletions internal/eviction/s3fifo/runtime.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newTable(bucketCount int) *table {
func (s *Shard[K, V]) Get(key K) (V, *node.Node[K, V], bool) {
t := (*table)(atomic.LoadPointer(&s.table))
hash := s.calcShiftHash(key)
bucketIdx := t.mask & hash
bucketIdx := hash & t.mask
b := &t.buckets[bucketIdx]
for {
for i := 0; i < bucketSize; i++ {
Expand Down
13 changes: 8 additions & 5 deletions internal/stats/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package stats
import (
"sync"
"sync/atomic"

"github.com/maypok86/otter/internal/xmath"
"github.com/maypok86/otter/internal/xruntime"
)

var tokenPool sync.Pool

type token struct {
idx uint32
padding [cacheLineSize - 4]byte
padding [xruntime.CacheLineSize - 4]byte
}

// much faster than atomic in write heavy scenarios (for example stats).
Expand All @@ -20,11 +23,11 @@ type counter struct {

type cshard struct {
c int64
padding [cacheLineSize - 8]byte
padding [xruntime.CacheLineSize - 8]byte
}

func newCounter() *counter {
nshards := roundUpPowerOf2(parallelism())
nshards := xmath.RoundUpPowerOf2(xruntime.Parallelism())
return &counter{
shards: make([]cshard, nshards),
mask: nshards - 1,
Expand All @@ -43,15 +46,15 @@ func (c *counter) add(delta int64) {
t, ok := tokenPool.Get().(*token)
if !ok {
t = &token{}
t.idx = fastrand()
t.idx = xruntime.Fastrand()
}
for {
shard := &c.shards[t.idx&c.mask]
cnt := atomic.LoadInt64(&shard.c)
if atomic.CompareAndSwapInt64(&shard.c, cnt, cnt+delta) {
break
}
t.idx = fastrand()
t.idx = xruntime.Fastrand()
}
tokenPool.Put(t)
}
Expand Down
40 changes: 0 additions & 40 deletions internal/stats/runtime.go

This file was deleted.

45 changes: 41 additions & 4 deletions internal/unixtime/unixtime.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package unixtime

import (
"sync"
"sync/atomic"
"time"
)
Expand All @@ -9,18 +10,54 @@ import (
// and we don't need a more precise time for the expiry time (and most other operations).
var now uint64

func init() {
now = uint64(time.Now().Unix())
var (
mutex sync.Mutex
countInstance int
done chan struct{}
)

func startTimer() {
done = make(chan struct{})
atomic.StoreUint64(&now, uint64(time.Now().Unix()))

go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for t := range ticker.C {
atomic.StoreUint64(&now, uint64(t.Unix()))
for {
select {
case t := <-ticker.C:
atomic.StoreUint64(&now, uint64(t.Unix()))
case <-done:
return
}
}
}()
}

// Start should be called when the cache instance is created to initialize the timer.
func Start() {
mutex.Lock()
defer mutex.Unlock()

if countInstance == 0 {
startTimer()
}

countInstance++
}

// Stop should be called when closing and stopping the cache instance to stop the timer.
func Stop() {
mutex.Lock()
defer mutex.Unlock()

countInstance--
if countInstance == 0 {
done <- struct{}{}
close(done)
}
}

// Now returns time as a Unix time, the number of seconds elapsed since January 1, 1970 UTC.
func Now() uint64 {
return atomic.LoadUint64(&now)
Expand Down
4 changes: 4 additions & 0 deletions internal/unixtime/unixtime_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

func BenchmarkNow(b *testing.B) {
Start()

b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var ts uint64
Expand All @@ -15,6 +17,8 @@ func BenchmarkNow(b *testing.B) {
}
atomic.StoreUint64(&sink, ts)
})

Stop()
}

func BenchmarkTimeNowUnix(b *testing.B) {
Expand Down
18 changes: 18 additions & 0 deletions internal/unixtime/unixtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,27 @@ import (
)

func TestNow(t *testing.T) {
Start()

expected := time.Now().Unix()
got := Now()
if uint64(expected) != got {
t.Fatalf("unexpected unix time; got %d; want %d", got, expected)
}

time.Sleep(3 * time.Second)

expected = time.Now().Unix()
got = Now()
if uint64(expected)-got > 1 {
t.Fatalf("unexpected unix time; got %d; want %d", got, expected)
}

Stop()

time.Sleep(3 * time.Second)

if Now()-got > 1 {
t.Fatal("timer should have stopped")
}
}
10 changes: 10 additions & 0 deletions internal/xruntime/runtime.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package xruntime

import (
"runtime"
_ "unsafe"
)

Expand All @@ -9,6 +10,15 @@ const (
CacheLineSize = 64
)

func Parallelism() uint32 {
maxProcs := uint32(runtime.GOMAXPROCS(0))
numCPU := uint32(runtime.NumCPU())
if maxProcs < numCPU {
return maxProcs
}
return numCPU
}

//go:noescape
//go:linkname Fastrand runtime.fastrand
func Fastrand() uint32

0 comments on commit fe9884f

Please sign in to comment.