Skip to content

Commit

Permalink
Merge pull request #78 from karlseguin/control_stop
Browse files Browse the repository at this point in the history
Refactor control messages + Stop handling
  • Loading branch information
karlseguin authored Jan 7, 2023
2 parents ece93bf + 22776be commit 3505243
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 197 deletions.
129 changes: 20 additions & 109 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type gc struct {

type Cache[T any] struct {
*Configuration[T]
control
list *List[*Item[T]]
size int64
buckets []*bucket[T]
bucketMask uint32
deletables chan *Item[T]
promotables chan *Item[T]
control chan interface{}
}

// Create a new cache with the specified configuration
Expand All @@ -51,16 +51,18 @@ func New[T any](config *Configuration[T]) *Cache[T] {
c := &Cache[T]{
list: NewList[*Item[T]](),
Configuration: config,
control: newControl(),
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket[T], config.buckets),
control: make(chan interface{}),
deletables: make(chan *Item[T], config.deleteBuffer),
promotables: make(chan *Item[T], config.promoteBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &bucket[T]{
lookup: make(map[string]*Item[T]),
}
}
c.restart()
go c.worker()
return c
}

Expand Down Expand Up @@ -184,94 +186,6 @@ func (c *Cache[T]) Delete(key string) bool {
return false
}

// Clears the cache
// This is a control command.
func (c *Cache[T]) Clear() {
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}

// Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic
// This is a control command.
func (c *Cache[T]) Stop() {
close(c.promotables)
<-c.control
}

// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
// This is a control command.
func (c *Cache[T]) GetDropped() int {
return doGetDropped(c.control)
}

func doGetDropped(controlCh chan<- interface{}) int {
res := make(chan int)
controlCh <- getDropped{res: res}
return <-res
}

// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now.
//
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
// goroutine that updates its internal data structures asynchronously. This means that the
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
// application code will not care about this, but especially in a test scenario you may want to
// be able to know when the worker has caught up.
//
// This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
// This is a control command.
func (c *Cache[T]) SyncUpdates() {
doSyncUpdates(c.control)
}

func doSyncUpdates(controlCh chan<- interface{}) {
done := make(chan struct{})
controlCh <- syncWorker{done: done}
<-done
}

// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
// This is a control command.
func (c *Cache[T]) SetMaxSize(size int64) {
done := make(chan struct{})
c.control <- setMaxSize{size: size, done: done}
<-done
}

// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c *Cache[T]) GC() {
done := make(chan struct{})
c.control <- gc{done: done}
<-done
}

// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c *Cache[T]) GetSize() int64 {
res := make(chan int64)
c.control <- getSize{res}
return <-res
}

func (c *Cache[T]) restart() {
c.deletables = make(chan *Item[T], c.deleteBuffer)
c.promotables = make(chan *Item[T], c.promoteBuffer)
c.control = make(chan interface{})
go c.worker()
}

func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
bucket.delete(item.key) //stop other GETs from getting it
c.deletables <- item
Expand All @@ -293,48 +207,48 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
}

func (c *Cache[T]) worker() {
defer close(c.control)
dropped := 0
cc := c.control

promoteItem := func(item *Item[T]) {
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
}

for {
select {
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
case item := <-c.promotables:
promoteItem(item)
case item := <-c.deletables:
c.doDelete(item)
case control := <-c.control:
case control := <-cc:
switch msg := control.(type) {
case getDropped:
case controlStop:
goto drain
case controlGetDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
case controlSetMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
msg.done <- struct{}{}
case clear:
case controlClear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = NewList[*Item[T]]()
msg.done <- struct{}{}
case getSize:
case controlGetSize:
msg.res <- c.size
case gc:
case controlGC:
dropped += c.gc()
msg.done <- struct{}{}
case syncWorker:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
c.deletables, c.doDelete)
case controlSyncUpdates:
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
msg.done <- struct{}{}
}
}
Expand All @@ -346,7 +260,6 @@ drain:
case item := <-c.deletables:
c.doDelete(item)
default:
close(c.deletables)
return
}
}
Expand All @@ -367,9 +280,7 @@ doAllPromotes:
for {
select {
case item := <-promotables:
if item != nil {
promoteFn(item)
}
promoteFn(item)
default:
break doAllPromotes
}
Expand Down
24 changes: 24 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,30 @@ func Test_CachePrune(t *testing.T) {
}
}

func Test_ConcurrentStop(t *testing.T) {
for i := 0; i < 100; i++ {
cache := New(Configure[string]())
r := func() {
for {
key := strconv.Itoa(int(rand.Int31n(100)))
switch rand.Int31n(3) {
case 0:
cache.Get(key)
case 1:
cache.Set(key, key, time.Minute)
case 2:
cache.Delete(key)
}
}
}
go r()
go r()
go r()
time.Sleep(time.Millisecond * 10)
cache.Stop()
}
}

type SizedItem struct {
id int
s int64
Expand Down
110 changes: 110 additions & 0 deletions control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package ccache

type controlGC struct {
done chan struct{}
}

type controlClear struct {
done chan struct{}
}

type controlStop struct {
}

type controlGetSize struct {
res chan int64
}

type controlGetDropped struct {
res chan int
}

type controlSetMaxSize struct {
size int64
done chan struct{}
}

type controlSyncUpdates struct {
done chan struct{}
}

type control chan interface{}

func newControl() chan interface{} {
return make(chan interface{}, 5)
}

// Forces GC. There should be no reason to call this function, except from tests
// which require synchronous GC.
// This is a control command.
func (c control) GC() {
done := make(chan struct{})
c <- controlGC{done: done}
<-done
}

// Sends a stop signal to the worker thread. The worker thread will shut down
// 5 seconds after the last message is received. The cache should not be used
// after Stop is called, but concurrently executing requests should properly finish
// executing.
// This is a control command.
func (c control) Stop() {
c.SyncUpdates()
c <- controlStop{}
}

// Clears the cache
// This is a control command.
func (c control) Clear() {
done := make(chan struct{})
c <- controlClear{done: done}
<-done
}

// Gets the size of the cache. This is an O(1) call to make, but it is handled
// by the worker goroutine. It's meant to be called periodically for metrics, or
// from tests.
// This is a control command.
func (c control) GetSize() int64 {
res := make(chan int64)
c <- controlGetSize{res: res}
return <-res
}

// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
// This is a control command.
func (c control) GetDropped() int {
res := make(chan int)
c <- controlGetDropped{res: res}
return <-res
}

// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
// This is a control command.
func (c control) SetMaxSize(size int64) {
done := make(chan struct{})
c <- controlSetMaxSize{size: size, done: done}
<-done
}

// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
// that were done by the current goroutine up to now.
//
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
// goroutine that updates its internal data structures asynchronously. This means that the
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
// application code will not care about this, but especially in a test scenario you may want to
// be able to know when the worker has caught up.
//
// This applies only to cache methods that were previously called by the same goroutine that is
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
// This is a control command.
func (c control) SyncUpdates() {
done := make(chan struct{})
c <- controlSyncUpdates{done: done}
<-done
}
Loading

0 comments on commit 3505243

Please sign in to comment.