Skip to content

Commit

Permalink
Size aware cache (#924)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley authored Nov 27, 2024
1 parent 53fd80d commit e8ffa50
Show file tree
Hide file tree
Showing 17 changed files with 360 additions and 77 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.12
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
github.com/consensys/gnark-crypto v0.12.1
github.com/emirpasic/gods v1.18.1
github.com/ethereum/go-ethereum v1.14.8
github.com/fxamacker/cbor/v2 v2.5.0
github.com/gin-contrib/logger v0.2.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA=
github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.14.8 h1:NgOWvXS+lauK+zFukEvi85UmmsS/OkV0N23UZ1VTIig=
Expand Down
21 changes: 11 additions & 10 deletions relay/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
pb "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/core"
"github.com/emirpasic/gods/queues"
"github.com/emirpasic/gods/queues/linkedlistqueue"
lru "github.com/hashicorp/golang-lru/v2"
"sync"
"time"
Expand Down Expand Up @@ -38,7 +40,7 @@ type requestAuthenticator struct {
authenticatedClients map[string]struct{}

// authenticationTimeouts is a list of authentications that have been performed, along with their expiration times.
authenticationTimeouts []*authenticationTimeout
authenticationTimeouts queues.Queue

// authenticationTimeoutDuration is the duration for which an auth is valid.
// If this is zero, then auth saving is disabled, and each request will be authenticated independently.
Expand Down Expand Up @@ -67,7 +69,7 @@ func NewRequestAuthenticator(
authenticator := &requestAuthenticator{
ics: ics,
authenticatedClients: make(map[string]struct{}),
authenticationTimeouts: make([]*authenticationTimeout, 0),
authenticationTimeouts: linkedlistqueue.New(),
authenticationTimeoutDuration: authenticationTimeoutDuration,
keyCache: keyCache,
}
Expand Down Expand Up @@ -170,7 +172,7 @@ func (a *requestAuthenticator) saveAuthenticationResult(now time.Time, origin st
defer a.savedAuthLock.Unlock()

a.authenticatedClients[origin] = struct{}{}
a.authenticationTimeouts = append(a.authenticationTimeouts,
a.authenticationTimeouts.Enqueue(
&authenticationTimeout{
origin: origin,
expiration: now.Add(a.authenticationTimeoutDuration),
Expand All @@ -195,14 +197,13 @@ func (a *requestAuthenticator) isAuthenticationStillValid(now time.Time, address
// removeOldAuthentications removes any authentications that have expired.
// This method is not thread safe and should be called with the savedAuthLock held.
func (a *requestAuthenticator) removeOldAuthentications(now time.Time) {
index := 0
for ; index < len(a.authenticationTimeouts); index++ {
if a.authenticationTimeouts[index].expiration.After(now) {
for a.authenticationTimeouts.Size() > 0 {
val, _ := a.authenticationTimeouts.Peek()
next := val.(*authenticationTimeout)
if next.expiration.After(now) {
break
}
delete(a.authenticatedClients, a.authenticationTimeouts[index].origin)
}
if index > 0 {
a.authenticationTimeouts = a.authenticationTimeouts[index:]
delete(a.authenticatedClients, next.origin)
a.authenticationTimeouts.Dequeue()
}
}
16 changes: 12 additions & 4 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type blobProvider struct {
blobStore *blobstore.BlobStore

// blobCache is an LRU cache of blobs.
blobCache cache.CachedAccessor[v2.BlobKey, []byte]
blobCache cache.CacheAccessor[v2.BlobKey, []byte]

// fetchTimeout is the maximum time to wait for a blob fetch operation to complete.
fetchTimeout time.Duration
Expand All @@ -31,7 +31,7 @@ func newBlobProvider(
ctx context.Context,
logger logging.Logger,
blobStore *blobstore.BlobStore,
blobCacheSize int,
blobCacheSize uint64,
maxIOConcurrency int,
fetchTimeout time.Duration) (*blobProvider, error) {

Expand All @@ -42,15 +42,23 @@ func newBlobProvider(
fetchTimeout: fetchTimeout,
}

c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, maxIOConcurrency, server.fetchBlob)
c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight)

cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
server.blobCache = c
server.blobCache = cacheAccessor

return server, nil
}

// computeChunkCacheWeight computes the 'weight' of the blob for the cache. The weight of a blob
// is equal to its size, in bytes.
func computeBlobCacheWeight(_ v2.BlobKey, value []byte) uint64 {
return uint64(len(value))
}

// GetBlob retrieves a blob from the blob store.
func (s *blobProvider) GetBlob(ctx context.Context, blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobCache.Get(ctx, blobKey)
Expand Down
4 changes: 2 additions & 2 deletions relay/blob_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestReadWrite(t *testing.T) {
context.Background(),
logger,
blobStore,
10,
1024*1024*32,
32,
10*time.Second)
require.NoError(t, err)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestNonExistentBlob(t *testing.T) {
context.Background(),
logger,
blobStore,
10,
1024*1024*32,
32,
10*time.Second)
require.NoError(t, err)
Expand Down
25 changes: 25 additions & 0 deletions relay/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cache

// WeightCalculator is a function that calculates the weight of a key-value pair in a Cache.
// By default, the weight of a key-value pair is 1. Cache capacity is always specified in terms of
// the weight of the key-value pairs it can hold, rather than the number of key-value pairs.
type WeightCalculator[K comparable, V any] func(key K, value V) uint64

// Cache is an interface for a generic cache.
//
// Unless otherwise noted, Cache implementations are not required to be thread safe.
type Cache[K comparable, V any] interface {
// Get returns the value associated with the key, and a boolean indicating whether the key was found in the cache.
Get(key K) (V, bool)

// Put adds a key-value pair to the cache. After this operation, values may be dropped if the total weight
// exceeds the configured maximum weight. Will ignore the new value if it exceeds the maximum weight
// of the cache in and of itself.
Put(key K, value V)

// Size returns the number of key-value pairs in the cache.
Size() int

// Weight returns the total weight of the key-value pairs in the cache.
Weight() uint64
}
41 changes: 18 additions & 23 deletions relay/cache/cached_accessor.go → relay/cache/cache_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package cache

import (
"context"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/semaphore"
"sync"
)

// CachedAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
// CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
// are expensive, and prevents multiple concurrent cache misses for the same key.
type CachedAccessor[K comparable, V any] interface {
type CacheAccessor[K comparable, V any] interface {
// Get returns the value for the given key. If the value is not in the cache, it will be fetched using the Accessor.
// If the context is cancelled, the function may abort early. If multiple goroutines request the same key,
// cancellation of one request will not affect the others.
Get(ctx context.Context, key K) (V, error)
}

// Accessor is function capable of fetching a value from a resource. Used by CachedAccessor when there is a cache miss.
// Accessor is function capable of fetching a value from a resource. Used by CacheAccessor when there is a cache miss.
type Accessor[K comparable, V any] func(key K) (V, error)

// accessResult is a struct that holds the result of an Accessor call.
Expand All @@ -29,23 +28,24 @@ type accessResult[V any] struct {
err error
}

var _ CachedAccessor[string, string] = &cachedAccessor[string, string]{}
var _ CacheAccessor[string, string] = &cacheAccessor[string, string]{}

// Future work: the cache used in this implementation is suboptimal when storing items that have a large
// variance in size. The current implementation uses a fixed size cache, which requires the cached to be
// sized to the largest item that will be stored. This cache should be replaced with an implementation
// whose size can be specified by memory footprint in bytes.

// cachedAccessor is an implementation of CachedAccessor.
type cachedAccessor[K comparable, V any] struct {
// cacheAccessor is an implementation of CacheAccessor.
type cacheAccessor[K comparable, V any] struct {

// lookupsInProgress has an entry for each key that is currently being looked up via the accessor. The value
// is written into the channel when it is eventually fetched. If a key is requested more than once while a
// lookup in progress, the second (and following) requests will wait for the result of the first lookup
// to be written into the channel.
lookupsInProgress map[K]*accessResult[V]

// cache is the LRU cache used to store values fetched by the accessor.
cache *lru.Cache[K, V]
// cache is the underlying cache that this wrapper manages.
cache Cache[K, V]

// concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress.
concurrencyLimiter chan struct{}
Expand All @@ -57,20 +57,15 @@ type cachedAccessor[K comparable, V any] struct {
accessor Accessor[K, V]
}

// NewCachedAccessor creates a new CachedAccessor. The cacheSize parameter specifies the maximum number of items
// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items
// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent
// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess
// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor
// parameter is the function used to fetch values that are not in the cache.
func NewCachedAccessor[K comparable, V any](
cacheSize int,
func NewCacheAccessor[K comparable, V any](
cache Cache[K, V],
concurrencyLimit int,
accessor Accessor[K, V]) (CachedAccessor[K, V], error) {

cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, err
}
accessor Accessor[K, V]) (CacheAccessor[K, V], error) {

lookupsInProgress := make(map[K]*accessResult[V])

Expand All @@ -79,7 +74,7 @@ func NewCachedAccessor[K comparable, V any](
concurrencyLimiter = make(chan struct{}, concurrencyLimit)
}

return &cachedAccessor[K, V]{
return &cacheAccessor[K, V]{
cache: cache,
concurrencyLimiter: concurrencyLimiter,
accessor: accessor,
Expand All @@ -95,7 +90,7 @@ func newAccessResult[V any]() *accessResult[V] {
return result
}

func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
c.cacheLock.Lock()

// first, attempt to get the value from the cache
Expand Down Expand Up @@ -126,7 +121,7 @@ func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
// waitForResult waits for the result of a lookup that was initiated by another requester and returns it
// when it becomes is available. This method will return quickly if the provided context is cancelled.
// Doing so does not disrupt the other requesters that are also waiting for this result.
func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) {
func (c *cacheAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) {
err := result.sem.Acquire(ctx, 1)
if err != nil {
var zeroValue V
Expand All @@ -139,7 +134,7 @@ func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *access

// fetchResult fetches the value for the given key and returns it. If the context is cancelled before the value
// is fetched, the function will return early. If the fetch is successful, the value will be added to the cache.
func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) {
func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) {

// Perform the work in a background goroutine. This allows us to return early if the context is cancelled
// without disrupting the fetch operation that other requesters may be waiting for.
Expand All @@ -159,7 +154,7 @@ func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *a

// Update the cache if the fetch was successful.
if err == nil {
c.cache.Add(key, value)
c.cache.Put(key, value)
}

// Provide the result to all other goroutines that may be waiting for it.
Expand Down
Loading

0 comments on commit e8ffa50

Please sign in to comment.