diff --git a/cache.go b/cache.go index f27b075..074ad56 100644 --- a/cache.go +++ b/cache.go @@ -36,13 +36,13 @@ type gc struct { type Cache[T any] struct { *Configuration[T] + control list *List[*Item[T]] size int64 buckets []*bucket[T] bucketMask uint32 deletables chan *Item[T] promotables chan *Item[T] - control chan interface{} } // Create a new cache with the specified configuration @@ -51,16 +51,18 @@ func New[T any](config *Configuration[T]) *Cache[T] { c := &Cache[T]{ list: NewList[*Item[T]](), Configuration: config, + control: newControl(), bucketMask: uint32(config.buckets) - 1, buckets: make([]*bucket[T], config.buckets), - control: make(chan interface{}), + deletables: make(chan *Item[T], config.deleteBuffer), + promotables: make(chan *Item[T], config.promoteBuffer), } for i := 0; i < config.buckets; i++ { c.buckets[i] = &bucket[T]{ lookup: make(map[string]*Item[T]), } } - c.restart() + go c.worker() return c } @@ -184,94 +186,6 @@ func (c *Cache[T]) Delete(key string) bool { return false } -// Clears the cache -// This is a control command. -func (c *Cache[T]) Clear() { - done := make(chan struct{}) - c.control <- clear{done: done} - <-done -} - -// Stops the background worker. Operations performed on the cache after Stop -// is called are likely to panic -// This is a control command. -func (c *Cache[T]) Stop() { - close(c.promotables) - <-c.control -} - -// Gets the number of items removed from the cache due to memory pressure since -// the last time GetDropped was called -// This is a control command. -func (c *Cache[T]) GetDropped() int { - return doGetDropped(c.control) -} - -func doGetDropped(controlCh chan<- interface{}) int { - res := make(chan int) - controlCh <- getDropped{res: res} - return <-res -} - -// SyncUpdates waits until the cache has finished asynchronous state updates for any operations -// that were done by the current goroutine up to now. -// -// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker -// goroutine that updates its internal data structures asynchronously. This means that the -// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent; -// there is no guarantee that it happens before a Get or Set call has returned. Most of the time -// application code will not care about this, but especially in a test scenario you may want to -// be able to know when the worker has caught up. -// -// This applies only to cache methods that were previously called by the same goroutine that is -// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is -// no way to know whether any of them still have pending state updates when SyncUpdates returns. -// This is a control command. -func (c *Cache[T]) SyncUpdates() { - doSyncUpdates(c.control) -} - -func doSyncUpdates(controlCh chan<- interface{}) { - done := make(chan struct{}) - controlCh <- syncWorker{done: done} - <-done -} - -// Sets a new max size. That can result in a GC being run if the new maxium size -// is smaller than the cached size -// This is a control command. -func (c *Cache[T]) SetMaxSize(size int64) { - done := make(chan struct{}) - c.control <- setMaxSize{size: size, done: done} - <-done -} - -// Forces GC. There should be no reason to call this function, except from tests -// which require synchronous GC. -// This is a control command. -func (c *Cache[T]) GC() { - done := make(chan struct{}) - c.control <- gc{done: done} - <-done -} - -// Gets the size of the cache. This is an O(1) call to make, but it is handled -// by the worker goroutine. It's meant to be called periodically for metrics, or -// from tests. -// This is a control command. -func (c *Cache[T]) GetSize() int64 { - res := make(chan int64) - c.control <- getSize{res} - return <-res -} - -func (c *Cache[T]) restart() { - c.deletables = make(chan *Item[T], c.deleteBuffer) - c.promotables = make(chan *Item[T], c.promoteBuffer) - c.control = make(chan interface{}) - go c.worker() -} - func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) { bucket.delete(item.key) //stop other GETs from getting it c.deletables <- item @@ -293,48 +207,48 @@ func (c *Cache[T]) bucket(key string) *bucket[T] { } func (c *Cache[T]) worker() { - defer close(c.control) dropped := 0 + cc := c.control + promoteItem := func(item *Item[T]) { if c.doPromote(item) && c.size > c.maxSize { dropped += c.gc() } } + for { select { - case item, ok := <-c.promotables: - if ok == false { - goto drain - } + case item := <-c.promotables: promoteItem(item) case item := <-c.deletables: c.doDelete(item) - case control := <-c.control: + case control := <-cc: switch msg := control.(type) { - case getDropped: + case controlStop: + goto drain + case controlGetDropped: msg.res <- dropped dropped = 0 - case setMaxSize: + case controlSetMaxSize: c.maxSize = msg.size if c.size > c.maxSize { dropped += c.gc() } msg.done <- struct{}{} - case clear: + case controlClear: for _, bucket := range c.buckets { bucket.clear() } c.size = 0 c.list = NewList[*Item[T]]() msg.done <- struct{}{} - case getSize: + case controlGetSize: msg.res <- c.size - case gc: + case controlGC: dropped += c.gc() msg.done <- struct{}{} - case syncWorker: - doAllPendingPromotesAndDeletes(c.promotables, promoteItem, - c.deletables, c.doDelete) + case controlSyncUpdates: + doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete) msg.done <- struct{}{} } } @@ -346,7 +260,6 @@ drain: case item := <-c.deletables: c.doDelete(item) default: - close(c.deletables) return } } @@ -367,9 +280,7 @@ doAllPromotes: for { select { case item := <-promotables: - if item != nil { - promoteFn(item) - } + promoteFn(item) default: break doAllPromotes } diff --git a/cache_test.go b/cache_test.go index c7ba85f..050a1d8 100644 --- a/cache_test.go +++ b/cache_test.go @@ -337,6 +337,30 @@ func Test_CachePrune(t *testing.T) { } } +func Test_ConcurrentStop(t *testing.T) { + for i := 0; i < 100; i++ { + cache := New(Configure[string]()) + r := func() { + for { + key := strconv.Itoa(int(rand.Int31n(100))) + switch rand.Int31n(3) { + case 0: + cache.Get(key) + case 1: + cache.Set(key, key, time.Minute) + case 2: + cache.Delete(key) + } + } + } + go r() + go r() + go r() + time.Sleep(time.Millisecond * 10) + cache.Stop() + } +} + type SizedItem struct { id int s int64 diff --git a/control.go b/control.go new file mode 100644 index 0000000..b618e31 --- /dev/null +++ b/control.go @@ -0,0 +1,110 @@ +package ccache + +type controlGC struct { + done chan struct{} +} + +type controlClear struct { + done chan struct{} +} + +type controlStop struct { +} + +type controlGetSize struct { + res chan int64 +} + +type controlGetDropped struct { + res chan int +} + +type controlSetMaxSize struct { + size int64 + done chan struct{} +} + +type controlSyncUpdates struct { + done chan struct{} +} + +type control chan interface{} + +func newControl() chan interface{} { + return make(chan interface{}, 5) +} + +// Forces GC. There should be no reason to call this function, except from tests +// which require synchronous GC. +// This is a control command. +func (c control) GC() { + done := make(chan struct{}) + c <- controlGC{done: done} + <-done +} + +// Sends a stop signal to the worker thread. The worker thread will shut down +// 5 seconds after the last message is received. The cache should not be used +// after Stop is called, but concurrently executing requests should properly finish +// executing. +// This is a control command. +func (c control) Stop() { + c.SyncUpdates() + c <- controlStop{} +} + +// Clears the cache +// This is a control command. +func (c control) Clear() { + done := make(chan struct{}) + c <- controlClear{done: done} + <-done +} + +// Gets the size of the cache. This is an O(1) call to make, but it is handled +// by the worker goroutine. It's meant to be called periodically for metrics, or +// from tests. +// This is a control command. +func (c control) GetSize() int64 { + res := make(chan int64) + c <- controlGetSize{res: res} + return <-res +} + +// Gets the number of items removed from the cache due to memory pressure since +// the last time GetDropped was called +// This is a control command. +func (c control) GetDropped() int { + res := make(chan int) + c <- controlGetDropped{res: res} + return <-res +} + +// Sets a new max size. That can result in a GC being run if the new maxium size +// is smaller than the cached size +// This is a control command. +func (c control) SetMaxSize(size int64) { + done := make(chan struct{}) + c <- controlSetMaxSize{size: size, done: done} + <-done +} + +// SyncUpdates waits until the cache has finished asynchronous state updates for any operations +// that were done by the current goroutine up to now. +// +// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker +// goroutine that updates its internal data structures asynchronously. This means that the +// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent; +// there is no guarantee that it happens before a Get or Set call has returned. Most of the time +// application code will not care about this, but especially in a test scenario you may want to +// be able to know when the worker has caught up. +// +// This applies only to cache methods that were previously called by the same goroutine that is +// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is +// no way to know whether any of them still have pending state updates when SyncUpdates returns. +// This is a control command. +func (c control) SyncUpdates() { + done := make(chan struct{}) + c <- controlSyncUpdates{done: done} + <-done +} diff --git a/layeredcache.go b/layeredcache.go index e6a632b..ada096d 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -9,13 +9,13 @@ import ( type LayeredCache[T any] struct { *Configuration[T] + control list *List[*Item[T]] buckets []*layeredBucket[T] bucketMask uint32 size int64 deletables chan *Item[T] promotables chan *Item[T] - control chan interface{} } // Create a new layered cache with the specified configuration. @@ -35,17 +35,18 @@ func Layered[T any](config *Configuration[T]) *LayeredCache[T] { c := &LayeredCache[T]{ list: NewList[*Item[T]](), Configuration: config, + control: newControl(), bucketMask: uint32(config.buckets) - 1, buckets: make([]*layeredBucket[T], config.buckets), deletables: make(chan *Item[T], config.deleteBuffer), - control: make(chan interface{}), + promotables: make(chan *Item[T], config.promoteBuffer), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &layeredBucket[T]{ buckets: make(map[string]*bucket[T]), } } - c.restart() + go c.worker() return c } @@ -180,63 +181,6 @@ func (c *LayeredCache[T]) DeleteFunc(primary string, matches func(key string, it return c.bucket(primary).deleteFunc(primary, matches, c.deletables) } -// Clears the cache -func (c *LayeredCache[T]) Clear() { - done := make(chan struct{}) - c.control <- clear{done: done} - <-done -} - -func (c *LayeredCache[T]) Stop() { - close(c.promotables) - <-c.control -} - -// Gets the number of items removed from the cache due to memory pressure since -// the last time GetDropped was called -func (c *LayeredCache[T]) GetDropped() int { - return doGetDropped(c.control) -} - -// SyncUpdates waits until the cache has finished asynchronous state updates for any operations -// that were done by the current goroutine up to now. See Cache.SyncUpdates for details. -func (c *LayeredCache[T]) SyncUpdates() { - doSyncUpdates(c.control) -} - -// Sets a new max size. That can result in a GC being run if the new maxium size -// is smaller than the cached size -func (c *LayeredCache[T]) SetMaxSize(size int64) { - done := make(chan struct{}) - c.control <- setMaxSize{size: size, done: done} - <-done -} - -// Forces GC. There should be no reason to call this function, except from tests -// which require synchronous GC. -// This is a control command. -func (c *LayeredCache[T]) GC() { - done := make(chan struct{}) - c.control <- gc{done: done} - <-done -} - -// Gets the size of the cache. This is an O(1) call to make, but it is handled -// by the worker goroutine. It's meant to be called periodically for metrics, or -// from tests. -// This is a control command. -func (c *LayeredCache[T]) GetSize() int64 { - res := make(chan int64) - c.control <- getSize{res} - return <-res -} - -func (c *LayeredCache[T]) restart() { - c.promotables = make(chan *Item[T], c.promoteBuffer) - c.control = make(chan interface{}) - go c.worker() -} - func (c *LayeredCache[T]) set(primary, secondary string, value T, duration time.Duration, track bool) *Item[T] { item, existing := c.bucket(primary).set(primary, secondary, value, duration, track) if existing != nil { @@ -257,65 +201,76 @@ func (c *LayeredCache[T]) promote(item *Item[T]) { } func (c *LayeredCache[T]) worker() { - defer close(c.control) dropped := 0 + cc := c.control + promoteItem := func(item *Item[T]) { if c.doPromote(item) && c.size > c.maxSize { dropped += c.gc() } } - deleteItem := func(item *Item[T]) { - if item.node == nil { - item.promotions = -2 - } else { - c.size -= item.size - if c.onDelete != nil { - c.onDelete(item) - } - c.list.Remove(item.node) - item.node = nil - item.promotions = -2 - } - } + for { select { - case item, ok := <-c.promotables: - if ok == false { - return - } + case item := <-c.promotables: promoteItem(item) case item := <-c.deletables: - deleteItem(item) - case control := <-c.control: + c.doDelete(item) + case control := <-cc: switch msg := control.(type) { - case getDropped: + case controlStop: + goto drain + case controlGetDropped: msg.res <- dropped dropped = 0 - case setMaxSize: + case controlSetMaxSize: c.maxSize = msg.size if c.size > c.maxSize { dropped += c.gc() } msg.done <- struct{}{} - case clear: + case controlClear: for _, bucket := range c.buckets { bucket.clear() } c.size = 0 c.list = NewList[*Item[T]]() msg.done <- struct{}{} - case getSize: + case controlGetSize: msg.res <- c.size - case gc: + case controlGC: dropped += c.gc() msg.done <- struct{}{} - case syncWorker: - doAllPendingPromotesAndDeletes(c.promotables, promoteItem, - c.deletables, deleteItem) + case controlSyncUpdates: + doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete) msg.done <- struct{}{} } } } + +drain: + for { + select { + case item := <-c.deletables: + c.doDelete(item) + default: + return + } + } +} + +func (c *LayeredCache[T]) doDelete(item *Item[T]) { + if item.node == nil { + item.promotions = -2 + } else { + c.size -= item.size + if c.onDelete != nil { + c.onDelete(item) + } + c.list.Remove(item.node) + item.node = nil + item.promotions = -2 + } } func (c *LayeredCache[T]) doPromote(item *Item[T]) bool { diff --git a/layeredcache_test.go b/layeredcache_test.go index f8b3e2f..bed8881 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -396,6 +396,29 @@ func Test_LayeredCachePrune(t *testing.T) { } } +func Test_LayeredConcurrentStop(t *testing.T) { + for i := 0; i < 100; i++ { + cache := Layered(Configure[string]()) + r := func() { + for { + key := strconv.Itoa(int(rand.Int31n(100))) + switch rand.Int31n(3) { + case 0: + cache.Get(key, key) + case 1: + cache.Set(key, key, key, time.Minute) + case 2: + cache.Delete(key, key) + } + } + } + go r() + go r() + go r() + time.Sleep(time.Millisecond * 10) + cache.Stop() + } +} func newLayered[T any]() *LayeredCache[T] { c := Layered[T](Configure[T]()) c.Clear()