Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Application struct {
// Core infrastructure
DB *store.Store
MetricsRecorder metrics.Recorder
MetricsCache cache.Cache
MetricsCache cache.Cache[int64]
MetricsCacheCloser func() error
RateLimitRedisClient *redis.Client

Expand Down
10 changes: 5 additions & 5 deletions internal/bootstrap/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func initializeMetrics(cfg *config.Config) metrics.Recorder {
func initializeMetricsCache(
ctx context.Context,
cfg *config.Config,
) (cache.Cache, func() error, error) {
) (cache.Cache[int64], func() error, error) {
if !cfg.MetricsEnabled || !cfg.MetricsGaugeUpdateEnabled {
return nil, nil, nil
}
Expand All @@ -34,12 +34,12 @@ func initializeMetricsCache(
ctx, cancel := context.WithTimeout(ctx, cfg.CacheInitTimeout)
defer cancel()

var metricsCache cache.Cache
var metricsCache cache.Cache[int64]
var err error

switch cfg.MetricsCacheType {
case config.MetricsCacheTypeRedisAside:
metricsCache, err = cache.NewRueidisAsideCache(
metricsCache, err = cache.NewRueidisAsideCache[int64](
ctx,
cfg.RedisAddr,
cfg.RedisPassword,
Expand All @@ -60,7 +60,7 @@ func initializeMetricsCache(
)

case config.MetricsCacheTypeRedis:
metricsCache, err = cache.NewRueidisCache(
metricsCache, err = cache.NewRueidisCache[int64](
ctx,
cfg.RedisAddr,
cfg.RedisPassword,
Expand All @@ -73,7 +73,7 @@ func initializeMetricsCache(
log.Printf("Metrics cache: redis (addr=%s, db=%d)", cfg.RedisAddr, cfg.RedisDB)

default: // memory
metricsCache = cache.NewMemoryCache()
metricsCache = cache.NewMemoryCache[int64]()
log.Println("Metrics cache: memory (single instance only)")
}

Expand Down
4 changes: 2 additions & 2 deletions internal/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func addMetricsGaugeUpdateJob(
cfg *config.Config,
db *store.Store,
prometheusMetrics metrics.Recorder,
metricsCache cache.Cache,
metricsCache cache.Cache[int64],
) {
if !cfg.MetricsEnabled || !cfg.MetricsGaugeUpdateEnabled {
return
Expand Down Expand Up @@ -197,7 +197,7 @@ func addMetricsGaugeUpdateJob(
}

// addCacheCleanupJob adds cache cleanup on shutdown
func addCacheCleanupJob(m *graceful.Manager, metricsCache cache.Cache, cfg *config.Config) {
func addCacheCleanupJob(m *graceful.Manager, metricsCache cache.Cache[int64], cfg *config.Config) {
if metricsCache == nil {
return
}
Expand Down
40 changes: 20 additions & 20 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestMemoryCache_GetSet(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Test Set and Get
Expand All @@ -27,7 +27,7 @@ func TestMemoryCache_GetSet(t *testing.T) {
}

func TestMemoryCache_GetMiss(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

_, err := cache.Get(ctx, "non-existent")
Expand All @@ -37,7 +37,7 @@ func TestMemoryCache_GetMiss(t *testing.T) {
}

func TestMemoryCache_Expiration(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Set with very short TTL
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestMemoryCache_Expiration(t *testing.T) {
}

func TestMemoryCache_MGetMSet(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Test MSet
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestMemoryCache_MGetMSet(t *testing.T) {
}

func TestMemoryCache_MGetExpiration(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Set values with short TTL
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestMemoryCache_MGetExpiration(t *testing.T) {
}

func TestMemoryCache_Delete(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Set a value
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestMemoryCache_Delete(t *testing.T) {
}

func TestMemoryCache_Close(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Set some values
Expand All @@ -187,7 +187,7 @@ func TestMemoryCache_Close(t *testing.T) {
}

func TestMemoryCache_Health(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

err := cache.Health(ctx)
Expand All @@ -197,7 +197,7 @@ func TestMemoryCache_Health(t *testing.T) {
}

func TestMemoryCache_Concurrent(t *testing.T) {
cache := NewMemoryCache()
cache := NewMemoryCache[int64]()
ctx := context.Background()

// Test concurrent writes and reads
Expand Down Expand Up @@ -236,8 +236,8 @@ func TestMemoryCache_Concurrent(t *testing.T) {
}
}

func TestMemoryCache_GetWithFetch(t *testing.T) {
cache := NewMemoryCache()
func TestGetWithFetch(t *testing.T) {
c := NewMemoryCache[int64]()
ctx := context.Background()

fetchCount := 0
Expand All @@ -247,7 +247,7 @@ func TestMemoryCache_GetWithFetch(t *testing.T) {
}

// First call - should fetch
value, err := cache.GetWithFetch(ctx, "test-key", time.Minute, fetchFunc)
value, err := GetWithFetch(ctx, c, "test-key", time.Minute, fetchFunc)
if err != nil {
t.Fatalf("GetWithFetch failed: %v", err)
}
Expand All @@ -259,7 +259,7 @@ func TestMemoryCache_GetWithFetch(t *testing.T) {
}

// Second call - should use cache
value, err = cache.GetWithFetch(ctx, "test-key", time.Minute, fetchFunc)
value, err = GetWithFetch(ctx, c, "test-key", time.Minute, fetchFunc)
if err != nil {
t.Fatalf("GetWithFetch failed: %v", err)
}
Expand All @@ -271,8 +271,8 @@ func TestMemoryCache_GetWithFetch(t *testing.T) {
}
}

func TestMemoryCache_GetWithFetchError(t *testing.T) {
cache := NewMemoryCache()
func TestGetWithFetch_Error(t *testing.T) {
c := NewMemoryCache[int64]()
ctx := context.Background()

expectedErr := ErrCacheUnavailable
Expand All @@ -281,14 +281,14 @@ func TestMemoryCache_GetWithFetchError(t *testing.T) {
}

// Should return fetch error
_, err := cache.GetWithFetch(ctx, "test-key", time.Minute, fetchFunc)
_, err := GetWithFetch(ctx, c, "test-key", time.Minute, fetchFunc)
if err != expectedErr {
t.Errorf("Expected error %v, got %v", expectedErr, err)
}
}

func TestMemoryCache_GetWithFetchExpiration(t *testing.T) {
cache := NewMemoryCache()
func TestGetWithFetch_Expiration(t *testing.T) {
c := NewMemoryCache[int64]()
ctx := context.Background()

fetchCount := 0
Expand All @@ -298,7 +298,7 @@ func TestMemoryCache_GetWithFetchExpiration(t *testing.T) {
}

// First call - should fetch
value, err := cache.GetWithFetch(ctx, "expire-key", 50*time.Millisecond, fetchFunc)
value, err := GetWithFetch(ctx, c, "expire-key", 50*time.Millisecond, fetchFunc)
if err != nil {
t.Fatalf("GetWithFetch failed: %v", err)
}
Expand All @@ -310,7 +310,7 @@ func TestMemoryCache_GetWithFetchExpiration(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// Should fetch again after expiration
value, err = cache.GetWithFetch(ctx, "expire-key", 50*time.Millisecond, fetchFunc)
value, err = GetWithFetch(ctx, c, "expire-key", 50*time.Millisecond, fetchFunc)
if err != nil {
t.Fatalf("GetWithFetch failed: %v", err)
}
Expand Down
76 changes: 54 additions & 22 deletions internal/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,22 @@ import (
"time"
)

// Cache defines the interface for caching metrics data.
// All implementations should store int64 values (counts) for simplicity.
type Cache interface {
// Get retrieves a single value from cache
Get(ctx context.Context, key string) (int64, error)
// Cache defines the primitive operations for a key-value cache.
// T is the type of value stored in the cache (e.g. int64, string, or a struct).
type Cache[T any] interface {
// Get retrieves a single value from cache.
// Returns ErrCacheMiss if the key does not exist or has expired.
Get(ctx context.Context, key string) (T, error)

// Set stores a single value in cache with TTL
Set(ctx context.Context, key string, value int64, ttl time.Duration) error
Set(ctx context.Context, key string, value T, ttl time.Duration) error

// GetWithFetch retrieves a value using cache-aside pattern.
// On cache miss, calls fetchFunc to get the value and automatically stores it in cache.
// This method is part of the main interface to ensure all implementations provide
// optimal cache-aside support. RueidisAsideCache provides an optimized implementation
// using rueidisaside's automatic cache management.
GetWithFetch(
ctx context.Context,
key string,
ttl time.Duration,
fetchFunc func(ctx context.Context, key string) (int64, error),
) (int64, error)

// MGet retrieves multiple values from cache
// Returns a map of key->value for keys that exist
MGet(ctx context.Context, keys []string) (map[string]int64, error)
// MGet retrieves multiple values from cache.
// Returns a map of key->value for keys that exist and have not expired.
MGet(ctx context.Context, keys []string) (map[string]T, error)

// MSet stores multiple values in cache with TTL
MSet(ctx context.Context, values map[string]int64, ttl time.Duration) error
MSet(ctx context.Context, values map[string]T, ttl time.Duration) error

// Delete removes a key from cache
Delete(ctx context.Context, key string) error
Expand All @@ -42,3 +31,46 @@ type Cache interface {
// Health checks if the cache is healthy
Health(ctx context.Context) error
}

// WithFetch extends Cache with an optimized cache-aside operation.
// Implementations that can provide stampede protection (e.g. RueidisAsideCache)
// should implement this interface. Callers should prefer this over the generic
// GetWithFetch helper when available, via type assertion.
type WithFetch[T any] interface {
Cache[T]

// GetWithFetch retrieves a value using an optimized cache-aside pattern.
// On cache miss, fetchFunc is called exactly once even under concurrent load,
// and the result is stored in cache automatically.
GetWithFetch(
ctx context.Context,
key string,
ttl time.Duration,
fetchFunc func(ctx context.Context, key string) (T, error),
) (T, error)
}

// GetWithFetch is a generic cache-aside helper for any Cache implementation.
// On cache miss it calls fetchFunc, stores the result, and returns it.
// Use this when the cache does not implement WithFetch.
// Note: does not provide stampede protection under concurrent load.
func GetWithFetch[T any](
ctx context.Context,
c Cache[T],
key string,
ttl time.Duration,
fetchFunc func(ctx context.Context, key string) (T, error),
) (T, error) {
if value, err := c.Get(ctx, key); err == nil {
return value, nil
}

value, err := fetchFunc(ctx, key)
if err != nil {
var zero T
return zero, err
}

_ = c.Set(ctx, key, value, ttl)
return value, nil
}
Loading
Loading