Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cmd/synthetic-monitoring-agent/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"flag"
"strings"
)

type StringList []string

var _ flag.Value = (*StringList)(nil)

func (l *StringList) String() string {
if l == nil || len(*l) == 0 {
return ""
}

return strings.Join(*l, ", ")
}

func (l *StringList) Set(value string) error {
for v := range strings.SplitSeq(value, ",") {
*l = append(*l, strings.TrimSpace(v))
}

return nil
}
49 changes: 49 additions & 0 deletions cmd/synthetic-monitoring-agent/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestStringList(t *testing.T) {
{
var sl StringList
require.Equal(t, "", sl.String())
}

{
var sl StringList
require.NoError(t, sl.Set("a"))
require.Equal(t, []string{"a"}, []string(sl))
require.Equal(t, "a", sl.String())
}

{
var sl StringList
require.NoError(t, sl.Set("a,b"))
require.Equal(t, []string{"a", "b"}, []string(sl))
require.Equal(t, "a, b", sl.String())
}

{
var sl StringList
require.NoError(t, sl.Set("a,b,c"))
require.Equal(t, []string{"a", "b", "c"}, []string(sl))
require.Equal(t, "a, b, c", sl.String())
}

{
var sl StringList
require.NoError(t, sl.Set("a, b"))
require.Equal(t, []string{"a", "b"}, []string(sl))
require.Equal(t, "a, b", sl.String())
}

{
var sl StringList
require.NoError(t, sl.Set(" a, b "))
require.Equal(t, []string{"a", "b"}, []string(sl))
require.Equal(t, "a, b", sl.String())
}
}
118 changes: 110 additions & 8 deletions cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/grpclog"

"github.com/grafana/synthetic-monitoring-agent/internal/adhoc"
"github.com/grafana/synthetic-monitoring-agent/internal/cache"
"github.com/grafana/synthetic-monitoring-agent/internal/checks"
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
"github.com/grafana/synthetic-monitoring-agent/internal/http"
Expand Down Expand Up @@ -76,15 +77,22 @@ func run(args []string, stdout io.Writer) error {
MemLimitRatio float64
DisableK6 bool
DisableUsageReports bool
CacheType string
CacheLocalCapacity int
CacheLocalTTL time.Duration
MemcachedServers StringList
}{
GrpcApiServerAddr: "localhost:4031",
HttpListenAddr: "localhost:4050",
K6URI: "sm-k6",
K6BlacklistedIP: "10.0.0.0/8",
SelectedPublisher: pusherV2.Name,
TelemetryTimeSpan: defTelemetryTimeSpan,
AutoMemLimit: true,
MemLimitRatio: 0.9,
GrpcApiServerAddr: "localhost:4031",
HttpListenAddr: "localhost:4050",
K6URI: "sm-k6",
K6BlacklistedIP: "10.0.0.0/8",
SelectedPublisher: pusherV2.Name,
TelemetryTimeSpan: defTelemetryTimeSpan,
AutoMemLimit: true,
MemLimitRatio: 0.9,
CacheType: "auto",
CacheLocalCapacity: 10000,
CacheLocalTTL: 5 * time.Minute,
}
)

Expand All @@ -109,6 +117,10 @@ func run(args []string, stdout io.Writer) error {
flags.BoolVar(&config.DisableUsageReports, "disable-usage-reports", config.DisableUsageReports, "Disable anonymous usage reports")
flags.Float64Var(&config.MemLimitRatio, "memlimit-ratio", config.MemLimitRatio, "fraction of available memory to use")
flags.Var(&features, "features", "optional feature flags")
flags.StringVar(&config.CacheType, "cache-type", config.CacheType, "cache type: auto (memcached if servers provided, else local), memcached, local, or noop")
flags.IntVar(&config.CacheLocalCapacity, "cache-local-capacity", config.CacheLocalCapacity, "maximum number of items in local cache")
flags.DurationVar(&config.CacheLocalTTL, "cache-local-ttl", config.CacheLocalTTL, "default TTL for local cache items")
flags.Var(&config.MemcachedServers, "memcached-servers", "memcached servers")

if err := flags.Parse(args[1:]); err != nil {
return err
Expand Down Expand Up @@ -233,6 +245,15 @@ func run(args []string, stdout io.Writer) error {
return err
}

// Initialize cache client (always non-nil, with fallback chain: memcached → local → noop)
cacheClient := setupCache(
config.CacheType,
config.MemcachedServers,
config.CacheLocalCapacity,
config.CacheLocalTTL,
&zl,
)

// to know if probe is connected to API
readynessHandler := NewReadynessHandler()

Expand Down Expand Up @@ -300,6 +321,7 @@ func run(args []string, stdout io.Writer) error {
synthetic_monitoring.NewTenantsClient(conn),
tenantCh,
tenants.DefaultCacheTimeout,
cacheClient,
zl.With().Str("subsystem", "tenant_manager").Logger(),
)

Expand Down Expand Up @@ -469,3 +491,83 @@ func setupGoMemLimit(ratio float64) error {

return nil
}

func setupCache(cacheType string, memcachedServers []string, localCapacity int, localTTL time.Duration, logger *zerolog.Logger) cache.Cache {
// Determine effective cache type with auto mode logic:
// auto + servers provided -> memcached -> local -> noop
// auto + no servers -> local -> noop
effectiveType := cacheType
if cacheType == "auto" {
if len(memcachedServers) > 0 {
effectiveType = "memcached"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may make sense as a typed string in the cache package, so the valid values are defined upfront and we don't need to handle the literals.

e.g:

package cache

type Type string

const (
	TypeMemcached Type = "memcached"
	TypeLocal     Type = "local"
	TypeNoop      Type = "noop"
)

} else {
effectiveType = "local"
}
}

switch effectiveType {
case "memcached":
if len(memcachedServers) == 0 {
logger.Warn().Msg("memcached type selected but no servers configured, falling back to local cache")
return setupLocalCache(localCapacity, localTTL, logger)
}

cacheConfig := cache.MemcachedConfig{
Servers: memcachedServers,
Logger: logger.With().Str("subsystem", "cache").Logger(),
Timeout: 100 * time.Millisecond,
}

cacheClient, err := cache.NewMemcachedClient(cacheConfig)
if err != nil {
logger.Warn().
Err(err).
Strs("servers", memcachedServers).
Msg("failed to initialize memcached cache, falling back to local cache")
return setupLocalCache(localCapacity, localTTL, logger)
}

logger.Info().
Strs("servers", memcachedServers).
Msg("memcached cache initialized")
return cacheClient

case "local":
return setupLocalCache(localCapacity, localTTL, logger)

case "noop":
logger.Debug().Msg("noop cache selected")
return cache.NewNoop(logger.With().Str("subsystem", "cache").Logger())

default:
logger.Warn().
Str("type", effectiveType).
Msg("unknown cache type, falling back to local cache")
return setupLocalCache(localCapacity, localTTL, logger)
}
}

func setupLocalCache(capacity int, ttl time.Duration, logger *zerolog.Logger) cache.Cache {
localConfig := cache.LocalConfig{
MaxCapacity: capacity,
InitialCapacity: capacity / 10, // 10% of max as initial capacity
DefaultTTL: ttl,
Logger: logger.With().Str("subsystem", "cache").Logger(),
}

localCache, err := cache.NewLocal(localConfig)
if err != nil {
logger.Warn().
Err(err).
Int("capacity", capacity).
Dur("ttl", ttl).
Msg("failed to initialize local cache, using noop cache")
return cache.NewNoop(logger.With().Str("subsystem", "cache").Logger())
}

logger.Info().
Int("capacity", capacity).
Dur("ttl", ttl).
Msg("local cache initialized")
return localCache
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ require (
require (
github.com/KimMachineGun/automemlimit v0.7.5
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b
github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf
github.com/felixge/httpsnoop v1.0.4
github.com/go-kit/log v0.2.1
github.com/gogo/status v1.1.1
github.com/grafana/gsm-api-go-client v0.2.1
github.com/grafana/loki/pkg/push v0.0.0-20250903135404-0b2d0b070e96
github.com/jpillora/backoff v1.0.0
github.com/maypok86/otter/v2 v2.2.1
github.com/mccutchen/go-httpbin/v2 v2.19.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus-community/pro-bing v0.7.0
github.com/puzpuzpuz/xsync/v4 v4.2.0
github.com/quasilyte/go-ruleguard/dsl v0.3.23
github.com/spf13/afero v1.15.0
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjk
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf h1:TqhNAT4zKbTdLa62d2HDBFdvgSbIGB3eJE8HqhgiL9I=
github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY=
github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE=
Expand Down Expand Up @@ -140,6 +142,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/maypok86/otter/v2 v2.2.1 h1:hnGssisMFkdisYcvQ8L019zpYQcdtPse+g0ps2i7cfI=
github.com/maypok86/otter/v2 v2.2.1/go.mod h1:1NKY9bY+kB5jwCXBJfE59u+zAwOt6C7ni1FTlFFMqVs=
github.com/mccutchen/go-httpbin/v2 v2.19.0 h1:bU3S6EH9Mr94sUPk1sdFLULlz9TagwZvmj+rpfilkd0=
github.com/mccutchen/go-httpbin/v2 v2.19.0/go.mod h1:GBy5I7XwZ4ZLhT3hcq39I4ikwN9x4QUt6EAxNiR8Jus=
github.com/miekg/dns v1.1.68 h1:jsSRkNozw7G/mnmXULynzMNIsgY2dHC8LO6U6Ij2JEA=
Expand Down Expand Up @@ -182,6 +186,8 @@ github.com/prometheus/prometheus v0.305.0 h1:UO/LsM32/E9yBDtvQj8tN+WwhbyWKR10lO3
github.com/prometheus/prometheus v0.305.0/go.mod h1:JG+jKIDUJ9Bn97anZiCjwCxRyAx+lpcEQ0QnZlUlbwY=
github.com/prometheus/sigv4 v0.2.0 h1:qDFKnHYFswJxdzGeRP63c4HlH3Vbn1Yf/Ao2zabtVXk=
github.com/prometheus/sigv4 v0.2.0/go.mod h1:D04rqmAaPPEUkjRQxGqjoxdyJuyCh6E0M18fZr0zBiE=
github.com/puzpuzpuz/xsync/v4 v4.2.0 h1:dlxm77dZj2c3rxq0/XNvvUKISAmovoXF4a4qM6Wvkr0=
github.com/puzpuzpuz/xsync/v4 v4.2.0/go.mod h1:VJDmTCJMBt8igNxnkQd86r+8KUeN1quSfNKu5bLYFQo=
github.com/quasilyte/go-ruleguard/dsl v0.3.23 h1:lxjt5B6ZCiBeeNO8/oQsegE6fLeCzuMRoVWSkXC4uvY=
github.com/quasilyte/go-ruleguard/dsl v0.3.23/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
Expand Down
81 changes: 81 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Package cache provides an interface-based caching system with multiple implementations.
//
// The package defines a Cache interface that all implementations must satisfy,
// allowing the agent to use different cache backends interchangeably. This design
// eliminates the need for nil checks throughout the codebase.
//
// Available implementations:
// - Memcached: Distributed cache using memcached servers
// - Noop: No-op cache for testing and fallback (always returns ErrCacheMiss)
//
// The cache supports multiple memcached servers, custom expiration times,
// and automatic serialization/deserialization of Go values using encoding/gob.
//
// Example usage with memcached:
//
// cc, err := cache.New(cache.Config{
// Servers: []string{"localhost:11211", "cache1:11211"},
// Logger: logger,
// })
// if err != nil {
// // Fall back to noop cache
// cc = cache.NewNoop(logger)
// }
//
// // Store a struct
// err = cc.Set(ctx, "user:123", user, 5*time.Minute)
//
// // Retrieve a struct
// var user User
// err = cc.Get(ctx, "user:123", &user)
// if err == cache.ErrCacheMiss {
// // Key not found, fetch from source
// } else if err != nil {
// // Other error (network, deserialization, etc.)
// log.Warn("cache error: %v", err)
// }
//
// Example usage with noop cache (for testing):
//
// cc := cache.NewNoop(logger)
// // All Get operations return ErrCacheMiss
// // All Set/Delete/Flush operations succeed but do nothing
package cache

import (
"context"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/error_types"
)

const (
// ErrCacheMiss is returned when a key is not found in the cache.
// This is a sentinel error that allows callers to distinguish between
// a cache miss and other errors (network issues, serialization failures, etc.).
ErrCacheMiss = error_types.BasicError("cache miss")
)

// Cache defines the interface for cache operations.
// All cache implementations (memcached, noop, local, etc.) must implement this interface.
//
// This interface allows the agent to use different cache backends interchangeably,
// including a no-op implementation that eliminates the need for nil checks in client code.
type Cache interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three implementations of this interface.

A memcached one. This is the one I would like to use eventually.

A local one, based on Otter.

A noop one, which is a fallback in case the other two are not available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider a multi-tiered implementation, too, using L1 with Otter and L2 with memcached. I would prefer to get some numbers before going that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I initially expected a multi-tiered implementation. It's not something we need to implement immediately, I'm fine with running this using one or the other first and getting some data out of it.

// Set stores a value in the cache with the specified expiration time.
// Returns an error if the operation fails.
Set(ctx context.Context, key string, value any, expiration time.Duration) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bit where I struggled the most.

Otter uses generics to have a more strongly-typed cache. Since the goal is to use memcached, which does not have a strongly-typed API, I thought about multiple clients, one per type, but it was weird.


// Get retrieves a value from the cache and deserializes it into dest.
// Returns ErrCacheMiss if the key is not found.
// Returns an error for other failures (network, serialization, etc.).
Get(ctx context.Context, key string, dest any) error

// Delete removes a key from the cache.
// Returns nil if the key doesn't exist (idempotent).
Delete(ctx context.Context, key string) error

// Flush removes all items from the cache.
// Use this operation cautiously as it affects all cache users.
Flush(ctx context.Context) error
}
Loading
Loading