44 "context"
55 "errors"
66 "fmt"
7+ "hash/fnv"
78 "sort"
89 "strconv"
910 "strings"
@@ -48,20 +49,20 @@ type MetricsService struct {
4849 conn clickhouse.Conn
4950
5051 // opGuardCache is used to prevent duplicate writes of the same operation
51- opGuardCache * ristretto.Cache [string , struct {}]
52+ opGuardCache * ristretto.Cache [uint64 , struct {}]
5253
5354 processor * batchprocessor.BatchProcessor [SchemaUsageRequestItem ]
5455}
5556
5657// NewMetricsService creates a new metrics service
5758func NewMetricsService (logger * zap.Logger , chConn clickhouse.Conn , processorConfig ProcessorConfig ) * MetricsService {
58- cacheConfig := & ristretto.Config [string , struct {}]{
59+ cacheConfig := & ristretto.Config [uint64 , struct {}]{
5960 MaxCost : 50_000 ,
6061 NumCounters : 50_000 * 10 ,
6162 BufferItems : 64 ,
6263 IgnoreInternalCost : true ,
6364 }
64- opGuardCache , err := ristretto.NewCache [string , struct {}](cacheConfig )
65+ opGuardCache , err := ristretto.NewCache [uint64 , struct {}](cacheConfig )
6566 if err != nil {
6667 panic (err )
6768 }
@@ -160,23 +161,45 @@ func (s *MetricsService) Shutdown(timeout time.Duration) {
160161 }
161162}
162163
164+ // buildOperationCacheKey creates a composite key that uniquely identifies an operation
165+ // across organizations, federated graphs, and operation details to prevent collisions.
166+ // Matches the primary key used in the operation storage database (clickhouse).
167+ // Uses FNV-1a hash for efficient cache lookups with minimal memory overhead.
168+ func buildOperationCacheKey (federatedGraphID , organizationID , operationHash , operationName , operationType string ) uint64 {
169+ h := fnv .New64a ()
170+ // Write all components to the hash
171+ // No need to check errors as hash.Hash.Write never returns an error
172+ _ , _ = h .Write ([]byte (federatedGraphID ))
173+ _ , _ = h .Write ([]byte (":" ))
174+ _ , _ = h .Write ([]byte (organizationID ))
175+ _ , _ = h .Write ([]byte (":" ))
176+ _ , _ = h .Write ([]byte (operationHash ))
177+ _ , _ = h .Write ([]byte (":" ))
178+ _ , _ = h .Write ([]byte (operationName ))
179+ _ , _ = h .Write ([]byte (":" ))
180+ _ , _ = h .Write ([]byte (operationType ))
181+ return h .Sum64 ()
182+ }
183+
163184// prepareClickhouseBatches prepares the clickhouse batches for the given batch
164- // of schema usage items. It returns the operation, metric and request count batches.
185+ // of schema usage items. It returns the operation, metric and request count batches,
186+ // along with a slice of cache keys for operations that were added to the batch.
165187// If there is nothing to be processed, the corresponding batch will be nil.
166188func (s * MetricsService ) prepareClickhouseBatches (
167189 ctx context.Context , insertTime time.Time , batch []SchemaUsageRequestItem ,
168- ) (driver.Batch , driver.Batch , driver.Batch ) {
190+ ) (driver.Batch , driver.Batch , driver.Batch , [] uint64 ) {
169191 var (
170192 err error
171193 operationBatch driver.Batch
172194 metricBatch driver.Batch
173195 requestCountBatch driver.Batch
174196
175- hasMetrics = false
197+ hasMetrics = false
198+ opCacheKeys []uint64
176199 )
177200
178201 if len (batch ) == 0 {
179- return nil , nil , nil
202+ return nil , nil , nil , nil
180203 }
181204
182205 if requestCountBatch , err = s .conn .PrepareBatch (ctx , `INSERT INTO gql_metrics_router_requests` ); err != nil {
@@ -203,7 +226,15 @@ func (s *MetricsService) prepareClickhouseBatches(
203226 hasMetrics = true
204227 }
205228
206- if _ , exists := s .opGuardCache .Get (su .OperationInfo .Hash ); su .RequestDocument == "" || exists {
229+ opCacheKey := buildOperationCacheKey (
230+ item .Claims .FederatedGraphID ,
231+ item .Claims .OrganizationID ,
232+ su .OperationInfo .Hash ,
233+ su .OperationInfo .Name ,
234+ strings .ToLower (su .OperationInfo .Type .String ()),
235+ )
236+
237+ if _ , exists := s .opGuardCache .Get (opCacheKey ); su .RequestDocument == "" || exists {
207238 continue
208239 }
209240
@@ -234,12 +265,15 @@ func (s *MetricsService) prepareClickhouseBatches(
234265 s .logger .Error ("Failed to append operation to batch" , zap .Error (err ))
235266 continue
236267 }
268+
269+ // Store the cache key to avoid recomputing it later
270+ opCacheKeys = append (opCacheKeys , opCacheKey )
237271 }
238272 }
239273
240274 // If we do not have any metrics to process, we can return early.
241275 if ! hasMetrics {
242- return operationBatch , nil , requestCountBatch
276+ return operationBatch , nil , requestCountBatch , opCacheKeys
243277 }
244278
245279 for _ , item := range batch {
@@ -265,7 +299,7 @@ func (s *MetricsService) prepareClickhouseBatches(
265299 }
266300 }
267301
268- return operationBatch , metricBatch , requestCountBatch
302+ return operationBatch , metricBatch , requestCountBatch , opCacheKeys
269303}
270304
271305func (s * MetricsService ) appendUsageMetrics (
@@ -381,7 +415,7 @@ func (s *MetricsService) processBatch(_ context.Context, batch []SchemaUsageRequ
381415 insertTime := time .Now ()
382416 insertCtx := context .Background ()
383417
384- operationsBatch , metricsBatch , requestCountBatch := s .prepareClickhouseBatches (insertCtx , insertTime , batch )
418+ operationsBatch , metricsBatch , requestCountBatch , opCacheKeys := s .prepareClickhouseBatches (insertCtx , insertTime , batch )
385419
386420 var wg sync.WaitGroup
387421
@@ -398,13 +432,11 @@ func (s *MetricsService) processBatch(_ context.Context, batch []SchemaUsageRequ
398432 return fmt .Errorf ("failed to send operation batch: %w" , err )
399433 }
400434
401- for _ , item := range batch {
402- for _ , su := range item .SchemaUsage {
403- // Add the operation to the cache once it has been written
404- // We use a TTL of 30 days to prevent caching of operations that are no in our database
405- // due to storage retention policies
406- s .opGuardCache .SetWithTTL (su .OperationInfo .Hash , struct {}{}, 1 , 30 * 24 * time .Hour )
407- }
435+ // Add all operations to the cache once they have been written
436+ // We use a TTL of 30 days to prevent caching of operations that are not in our database
437+ // due to storage retention policies
438+ for _ , opCacheKey := range opCacheKeys {
439+ s .opGuardCache .SetWithTTL (opCacheKey , struct {}{}, 1 , 30 * 24 * time .Hour )
408440 }
409441
410442 s .opGuardCache .Wait ()
0 commit comments