diff --git a/controlplane/clickhouse/migrations/20250101213018_operation_planning_metrics_5_30_mv.sql b/controlplane/clickhouse/migrations/20250101213018_operation_planning_metrics_5_30_mv.sql index e76d689954..7d71283ff0 100644 --- a/controlplane/clickhouse/migrations/20250101213018_operation_planning_metrics_5_30_mv.sql +++ b/controlplane/clickhouse/migrations/20250101213018_operation_planning_metrics_5_30_mv.sql @@ -24,7 +24,7 @@ SELECT maxSimpleState(Max) AS MaxDuration FROM otel_metrics_histogram -- Only works with the same bounds for all buckets. If bounds are different, we can't add them together -WHERE ScopeName = 'cosmo.router' AND ScopeVersion = '0.0.1' AND MetricName = 'router.graphql.operation.planning_time' AND OrganizationID != '' AND FederatedGraphID != '' +WHERE ScopeName = 'cosmo.router' AND ScopeVersion = '0.0.1' AND MetricName = 'router.graphql.operation.planning_time' AND Attributes['wg.engine.plan_cache_hit'] == 'false' AND OrganizationID != '' AND FederatedGraphID != '' GROUP BY OperationName, OperationHash, diff --git a/controlplane/clickhouse/migrations/20250122211233_planning_time_exclude_cache_hits.sql b/controlplane/clickhouse/migrations/20250122211233_planning_time_exclude_cache_hits.sql new file mode 100644 index 0000000000..ddf9453660 --- /dev/null +++ b/controlplane/clickhouse/migrations/20250122211233_planning_time_exclude_cache_hits.sql @@ -0,0 +1,79 @@ +-- migrate:up + +ALTER TABLE cosmo.operation_planning_metrics_5_30_mv MODIFY QUERY + SELECT + toStartOfFiveMinute(TimeUnix) as Timestamp, + toLowCardinality(Attributes [ 'wg.operation.name' ]) as OperationName, + Attributes [ 'wg.operation.hash' ] as OperationHash, + toLowCardinality(Attributes [ 'wg.operation.type' ]) as OperationType, + Attributes [ 'wg.operation.persisted_id' ] as OperationPersistedID, + toLowCardinality(Attributes [ 'wg.router.config.version']) as RouterConfigVersion, + toLowCardinality(Attributes [ 'wg.federated_graph.id']) as FederatedGraphID, + toLowCardinality(Attributes [ 'wg.organization.id' ]) as OrganizationID, + toLowCardinality(Attributes [ 'wg.client.name' ]) as ClientName, + toLowCardinality(Attributes [ 'wg.client.version' ]) as ClientVersion, + -- Sum up the bucket counts on the same index which produces the overall count of samples of the histogram + sumForEachState(BucketCounts) as BucketCounts, + -- Populate the bounds so we have a base value for quantile calculations + ExplicitBounds, + sumSimpleState(Sum) AS Sum, + sumSimpleState(Count) AS Count, + minSimpleState(Min) AS MinDuration, + maxSimpleState(Max) AS MaxDuration + FROM otel_metrics_histogram + -- Only works with the same bounds for all buckets. If bounds are different, we can't add them together + WHERE ScopeName = 'cosmo.router' AND ScopeVersion = '0.0.1' AND MetricName = 'router.graphql.operation.planning_time' AND Attributes['wg.engine.plan_cache_hit'] == 'false' AND OrganizationID != '' AND FederatedGraphID != '' + GROUP BY + OperationName, + OperationHash, + OperationPersistedID, + FederatedGraphID, + RouterConfigVersion, + OrganizationID, + OperationType, + Timestamp, + ClientName, + ClientVersion, + ExplicitBounds + ORDER BY + Timestamp; + +-- migrate:down + +ALTER TABLE cosmo.operation_planning_metrics_5_30_mv MODIFY QUERY + SELECT + toStartOfFiveMinute(TimeUnix) as Timestamp, + toLowCardinality(Attributes [ 'wg.operation.name' ]) as OperationName, + Attributes [ 'wg.operation.hash' ] as OperationHash, + toLowCardinality(Attributes [ 'wg.operation.type' ]) as OperationType, + Attributes [ 'wg.operation.persisted_id' ] as OperationPersistedID, + toLowCardinality(Attributes [ 'wg.router.config.version']) as RouterConfigVersion, + toLowCardinality(Attributes [ 'wg.federated_graph.id']) as FederatedGraphID, + toLowCardinality(Attributes [ 'wg.organization.id' ]) as OrganizationID, + toLowCardinality(Attributes [ 'wg.client.name' ]) as ClientName, + toLowCardinality(Attributes [ 'wg.client.version' ]) as ClientVersion, + -- Sum up the bucket counts on the same index which produces the overall count of samples of the histogram + sumForEachState(BucketCounts) as BucketCounts, + -- Populate the bounds so we have a base value for quantile calculations + ExplicitBounds, + sumSimpleState(Sum) AS Sum, + sumSimpleState(Count) AS Count, + minSimpleState(Min) AS MinDuration, + maxSimpleState(Max) AS MaxDuration + FROM otel_metrics_histogram + -- Only works with the same bounds for all buckets. If bounds are different, we can't add them together + WHERE ScopeName = 'cosmo.router' AND ScopeVersion = '0.0.1' AND MetricName = 'router.graphql.operation.planning_time' AND OrganizationID != '' AND FederatedGraphID != '' + GROUP BY + OperationName, + OperationHash, + OperationPersistedID, + FederatedGraphID, + RouterConfigVersion, + OrganizationID, + OperationType, + Timestamp, + ClientName, + ClientVersion, + ExplicitBounds + ORDER BY + Timestamp; diff --git a/controlplane/src/core/bufservices/cache-warmer/pushCacheWarmerOperation.ts b/controlplane/src/core/bufservices/cache-warmer/pushCacheWarmerOperation.ts index a4b2a5f4e9..4a6fca1098 100644 --- a/controlplane/src/core/bufservices/cache-warmer/pushCacheWarmerOperation.ts +++ b/controlplane/src/core/bufservices/cache-warmer/pushCacheWarmerOperation.ts @@ -166,15 +166,15 @@ export function pushCacheWarmerOperation( } await cacheWarmerRepo.addCacheWarmerOperations({ - federatedGraphId: federatedGraph.id, - organizationId: authContext.organizationId, - createdById: authContext.userId, - isManuallyAdded: true, operations: [ { - name: req.operationName, - persistedId: req.operationPersistedId, - content: req.operationContent, + operationName: req.operationName, + operationPersistedID: req.operationPersistedId, + operationContent: req.operationContent, + federatedGraphId: federatedGraph.id, + organizationId: authContext.organizationId, + createdById: authContext.userId, + isManuallyAdded: true, clientName, }, ], diff --git a/controlplane/src/core/repositories/CacheWarmerRepository.ts b/controlplane/src/core/repositories/CacheWarmerRepository.ts index 61e03178cd..9b4db6b758 100644 --- a/controlplane/src/core/repositories/CacheWarmerRepository.ts +++ b/controlplane/src/core/repositories/CacheWarmerRepository.ts @@ -15,6 +15,7 @@ import { DateRange } from '../../types/index.js'; import { BlobStorage } from '../blobstorage/index.js'; import { ClickHouseClient } from '../clickhouse/index.js'; import { S3RouterConfigMetadata } from '../composition/composer.js'; +import { CacheWarmupOperation } from '../../db/models.js'; import { getDateRange, isoDateRangeToTimestamps } from './analytics/util.js'; interface ComputeCacheWarmerOperationsProps { @@ -24,16 +25,6 @@ interface ComputeCacheWarmerOperationsProps { federatedGraphId: string; } -interface DBCacheWarmerOperation { - content?: string; - hash?: string; - name?: string; - persistedId?: string; - clientName?: string; - clientVersion?: string; - planningTime?: number; -} - export class CacheWarmerRepository { constructor( private client: ClickHouseClient, @@ -49,13 +40,14 @@ export class CacheWarmerRepository { const parsedDateRange = isoDateRangeToTimestamps(dateRange, rangeInHours); const [start, end] = getDateRange(parsedDateRange); const quantile = 0.9; - const minPlanningTimeInMs = 0; + const minPlanningTimeInMs = 1; const query = ` WITH toDateTime('${start}') AS startDate, toDateTime('${end}') AS endDate SELECT + max(MaxDuration) as maxDuration, OperationHash as operationHash, OperationName as operationName, OperationPersistedID as operationPersistedID, @@ -76,7 +68,7 @@ export class CacheWarmerRepository { AND OrganizationID = '${organizationId}' AND OperationName != 'IntrospectionQuery' GROUP BY OperationHash, OperationName, OperationPersistedID, ClientName, ClientVersion - HAVING planningTime > ${minPlanningTimeInMs} + HAVING maxDuration >= ${minPlanningTimeInMs} ORDER BY planningTime DESC LIMIT 100 `; @@ -144,7 +136,7 @@ export class CacheWarmerRepository { const topOperationsByPlanningTime = await this.getTopOperationsByPlanningTime(props); const computedOperations: Operation[] = []; - const dbCacheWarmerOperations: DBCacheWarmerOperation[] = []; + const dbCacheWarmerOperations: CacheWarmupOperation[] = []; const manuallyAddedOperations = await this.getCacheWarmerOperations({ organizationId: props.organizationId, @@ -235,12 +227,15 @@ export class CacheWarmerRepository { ); dbCacheWarmerOperations.push({ - name: operation.operationName, - hash: operation.operationHash, - persistedId: operation.operationPersistedID, + operationName: operation.operationName, + operationHash: operation.operationHash, + operationPersistedID: operation.operationPersistedID, clientName: operation.clientName, clientVersion: operation.clientVersion, planningTime: operation.planningTime, + federatedGraphId: props.federatedGraphId, + organizationId: props.organizationId, + isManuallyAdded: false, }); continue; } @@ -252,13 +247,16 @@ export class CacheWarmerRepository { } dbCacheWarmerOperations.push({ - content: operationContent, - name: operation.operationName, - hash: operation.operationHash, - persistedId: operation.operationPersistedID, + operationName: operation.operationName, + operationHash: operation.operationHash, + operationPersistedID: operation.operationPersistedID, clientName: operation.clientName, clientVersion: operation.clientVersion, planningTime: operation.planningTime, + federatedGraphId: props.federatedGraphId, + organizationId: props.organizationId, + operationContent, + isManuallyAdded: false, }); computedOperations.push( @@ -285,9 +283,6 @@ export class CacheWarmerRepository { }); await cacheWarmerRepo.addCacheWarmerOperations({ - organizationId: props.organizationId, - federatedGraphId: props.federatedGraphId, - isManuallyAdded: false, operations: dbCacheWarmerOperations, }); }); @@ -328,45 +323,12 @@ export class CacheWarmerRepository { .then((res) => res.length > 0); } - public async addCacheWarmerOperations({ - organizationId, - federatedGraphId, - isManuallyAdded, - operations, - createdById, - }: { - organizationId: string; - federatedGraphId: string; - isManuallyAdded: boolean; - operations: { - content?: string; - hash?: string; - name?: string; - persistedId?: string; - clientName?: string; - clientVersion?: string; - planningTime?: number; - }[]; - createdById?: string; - }) { + public async addCacheWarmerOperations({ operations }: { operations: CacheWarmupOperation[] }) { if (!operations || operations.length === 0) { return; } - const data = operations.map((operation) => ({ - federatedGraphId, - organizationId, - isManuallyAdded, - operationContent: operation.content || null, - operationHash: operation.hash || null, - operationName: operation.name || null, - operationPersistedID: operation.persistedId || null, - clientName: operation.clientName || null, - clientVersion: operation.clientVersion || null, - planningTime: operation.planningTime, - createdById, - })); - - await this.db.insert(cacheWarmerOperations).values(data); + + await this.db.insert(cacheWarmerOperations).values(operations); } public getCacheWarmerOperations({ diff --git a/controlplane/src/db/models.ts b/controlplane/src/db/models.ts index f073731029..e09248cc64 100644 --- a/controlplane/src/db/models.ts +++ b/controlplane/src/db/models.ts @@ -12,6 +12,7 @@ import { websocketSubprotocolEnum, webhookDeliveries, graphPruningRulesEnum, + cacheWarmerOperations, } from './schema.js'; export type FederatedGraph = typeof federatedGraphs.$inferSelect; @@ -26,6 +27,7 @@ export type MemberRole = (typeof memberRoleEnum.enumValues)[number]; export type LintRuleEnum = (typeof lintRulesEnum.enumValues)[number]; export type GraphPruningRuleEnum = (typeof graphPruningRulesEnum.enumValues)[number]; export type WebsocketSubprotocol = (typeof websocketSubprotocolEnum.enumValues)[number]; +export type CacheWarmupOperation = typeof cacheWarmerOperations.$inferInsert; export type WebhookDeliveryInfo = typeof webhookDeliveries.$inferInsert; diff --git a/router-tests/cache_warmup_test.go b/router-tests/cache_warmup_test.go index ce70d79e65..aeae4b6ef1 100644 --- a/router-tests/cache_warmup_test.go +++ b/router-tests/cache_warmup_test.go @@ -1,6 +1,13 @@ package integration import ( + "context" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" + "github.com/wundergraph/cosmo/router/pkg/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "net/http" "testing" "time" @@ -562,3 +569,121 @@ func TestCacheWarmup(t *testing.T) { }) }) } + +func TestCacheWarmupMetrics(t *testing.T) { + t.Run("should emit planning times metrics during warmup", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + Source: config.CacheWarmupSource{ + Filesystem: &config.CacheWarmupFileSystemSource{ + Path: "testenv/testdata/cache_warmup/single", + }, + }, + }), + }, + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + routerConfig.FeatureFlagConfigs = nil + }, + AssertCacheMetrics: &testenv.CacheMetricsAssertions{ + BaseGraphAssertions: testenv.CacheMetricsAssertion{ + QueryNormalizationMisses: 1, + QueryNormalizationHits: 2, + ValidationMisses: 1, + ValidationHits: 2, + PlanMisses: 1, + PlanHits: 2, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Equal(t, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache")) + require.Equal(t, employeesIDData, res.Body) + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Equal(t, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache")) + + rm := metricdata.ResourceMetrics{} + err := metricReader.Collect(context.Background(), &rm) + + require.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 2) + + metricScope := GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router") + require.NotNil(t, metricScope) + + require.Len(t, metricScope.Metrics, 6) + + operationPlanningTimeMetric := metricdata.Metrics{ + Name: "router.graphql.operation.planning_time", + Description: "Operation planning time in milliseconds", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otel.WgClientName.String(""), + otel.WgClientVersion.String(""), + // This is a miss, because we just planned it + otel.WgEnginePlanCacheHit.Bool(false), + otel.WgFeatureFlag.String(""), + otel.WgOperationHash.String("1163600561566987607"), + otel.WgOperationName.String(""), + otel.WgOperationType.String("query"), + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Count: 1, + }, + { + Attributes: attribute.NewSet( + otel.WgClientName.String("unknown"), + otel.WgClientVersion.String("missing"), + // This is a hit, because we planned it for the base graph + otel.WgEnginePlanCacheHit.Bool(true), + otel.WgOperationHash.String("1163600561566987607"), + otel.WgOperationName.String(""), + otel.WgOperationProtocol.String("http"), + otel.WgOperationType.String("query"), + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Count: 2, + }, + }, + }, + } + + m := *GetMetricByName(metricScope, "router.graphql.operation.planning_time") + + // One when warming up the operation and one when executing the operation + require.Len(t, m.Data.(metricdata.Histogram[float64]).DataPoints, 2) + + // Warming up the operation + require.Equal(t, m.Data.(metricdata.Histogram[float64]).DataPoints[0].Count, uint64(1)) + + // Executing the operation. Is exported as an aggregated value + require.Equal(t, m.Data.(metricdata.Histogram[float64]).DataPoints[1].Count, uint64(2)) + + // Ensure we collected non-zero planning times + require.Greater(t, m.Data.(metricdata.Histogram[float64]).DataPoints[0].Sum, float64(0)) + require.Greater(t, m.Data.(metricdata.Histogram[float64]).DataPoints[1].Sum, float64(0)) + + metricdatatest.AssertEqual(t, operationPlanningTimeMetric, m, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + }) + }) +} diff --git a/router-tests/telemetry/telemetry_test.go b/router-tests/telemetry/telemetry_test.go index 6682185d0d..3b8c5542eb 100644 --- a/router-tests/telemetry/telemetry_test.go +++ b/router-tests/telemetry/telemetry_test.go @@ -387,7 +387,7 @@ func TestEngineStatisticsTelemetry(t *testing.T) { otel.WgRouterVersion.String("dev"), } - engineScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.engine") + engineScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.engine") connectionMetrics := metricdata.Metrics{ Name: "router.engine.connections", Description: "Number of connections in the engine. Contains both websocket and http connections", @@ -404,7 +404,7 @@ func TestEngineStatisticsTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, connectionMetrics, *getMetricByName(engineScope, "router.engine.connections"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, connectionMetrics, *integration.GetMetricByName(engineScope, "router.engine.connections"), metricdatatest.IgnoreTimestamp()) subscriptionMetrics := metricdata.Metrics{ Name: "router.engine.subscriptions", @@ -421,7 +421,7 @@ func TestEngineStatisticsTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, subscriptionMetrics, *getMetricByName(engineScope, "router.engine.subscriptions"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, subscriptionMetrics, *integration.GetMetricByName(engineScope, "router.engine.subscriptions"), metricdatatest.IgnoreTimestamp()) triggerMetrics := metricdata.Metrics{ Name: "router.engine.triggers", @@ -438,7 +438,7 @@ func TestEngineStatisticsTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, triggerMetrics, *getMetricByName(engineScope, "router.engine.triggers"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, triggerMetrics, *integration.GetMetricByName(engineScope, "router.engine.triggers"), metricdatatest.IgnoreTimestamp()) messagesSentMetrics := metricdata.Metrics{ Name: "router.engine.messages.sent", @@ -454,7 +454,7 @@ func TestEngineStatisticsTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, messagesSentMetrics, *getMetricByName(engineScope, "router.engine.messages.sent"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, messagesSentMetrics, *integration.GetMetricByName(engineScope, "router.engine.messages.sent"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) }) }) @@ -508,7 +508,7 @@ func TestOperationCacheTelemetry(t *testing.T) { require.NoError(t, err) require.Len(t, rm.ScopeMetrics, 2) - cacheScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") + cacheScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") require.NotNil(t, cacheScope) require.Len(t, cacheScope.Metrics, 4) @@ -594,7 +594,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, hitStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, hitStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) keyStatMetrics := metricdata.Metrics{ Name: "router.graphql.cache.keys.stats", @@ -699,7 +699,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, keyStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, keyStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) costStatsMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.stats", @@ -772,7 +772,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, costStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, costStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) maxCostMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.max", @@ -808,7 +808,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, maxCostMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, maxCostMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) }) }) @@ -887,7 +887,7 @@ func TestOperationCacheTelemetry(t *testing.T) { require.NoError(t, err) require.Len(t, rm.ScopeMetrics, 2) - cacheScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") + cacheScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") require.NotNil(t, cacheScope) require.Len(t, cacheScope.Metrics, 4) @@ -973,7 +973,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, hitStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, hitStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) keyStatMetrics := metricdata.Metrics{ Name: "router.graphql.cache.keys.stats", @@ -1078,7 +1078,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, keyStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, keyStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) costStatsMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.stats", @@ -1151,7 +1151,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, costStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, costStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) maxCostMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.max", @@ -1187,7 +1187,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, maxCostMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, maxCostMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) }) }) @@ -1230,7 +1230,7 @@ func TestOperationCacheTelemetry(t *testing.T) { require.NoError(t, err) require.Len(t, rm.ScopeMetrics, 2) - cacheScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") + cacheScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") require.NotNil(t, cacheScope) require.Len(t, cacheScope.Metrics, 4) @@ -1316,7 +1316,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, hitStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, hitStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) keyStatMetrics := metricdata.Metrics{ Name: "router.graphql.cache.keys.stats", @@ -1421,7 +1421,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, keyStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, keyStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) costStatsMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.stats", @@ -1494,7 +1494,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, costStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, costStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) maxCostMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.max", @@ -1530,7 +1530,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, maxCostMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, maxCostMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) }) }) @@ -1575,7 +1575,7 @@ func TestOperationCacheTelemetry(t *testing.T) { require.NoError(t, err) require.Len(t, rm.ScopeMetrics, 2) - cacheScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") + cacheScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") require.NotNil(t, cacheScope) require.Len(t, cacheScope.Metrics, 4) @@ -1661,7 +1661,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, requestStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, requestStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) keyStatMetrics := metricdata.Metrics{ Name: "router.graphql.cache.keys.stats", @@ -1767,7 +1767,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, keyStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, keyStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) costStatsMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.stats", @@ -1840,7 +1840,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, costStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, costStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) maxCostMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.max", @@ -1876,7 +1876,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, maxCostMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, maxCostMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) }) }) @@ -1941,7 +1941,7 @@ func TestOperationCacheTelemetry(t *testing.T) { require.NoError(t, err) require.Len(t, rm.ScopeMetrics, 2) - cacheScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") + cacheScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.cache") require.NotNil(t, cacheScope) require.Len(t, cacheScope.Metrics, 4) @@ -2100,7 +2100,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, requestStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, requestStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.requests.stats"), metricdatatest.IgnoreTimestamp()) keyStatMetrics := metricdata.Metrics{ Name: "router.graphql.cache.keys.stats", @@ -2307,7 +2307,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, keyStatMetrics, *getMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, keyStatMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.keys.stats"), metricdatatest.IgnoreTimestamp()) costStatsMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.stats", @@ -2449,7 +2449,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, costStatsMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, costStatsMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.stats"), metricdatatest.IgnoreTimestamp()) maxCostMetrics := metricdata.Metrics{ Name: "router.graphql.cache.cost.max", @@ -2517,7 +2517,7 @@ func TestOperationCacheTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, maxCostMetrics, *getMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, maxCostMetrics, *integration.GetMetricByName(cacheScope, "router.graphql.cache.cost.max"), metricdatatest.IgnoreTimestamp()) }) }) } @@ -2553,11 +2553,11 @@ func TestRuntimeTelemetry(t *testing.T) { // Runtime metrics - runtimeScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.runtime") + runtimeScope := integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.runtime") require.NotNil(t, runtimeScope) require.Len(t, runtimeScope.Metrics, 15) - metricRuntimeUptime := getMetricByName(runtimeScope, "process.uptime") + metricRuntimeUptime := integration.GetMetricByName(runtimeScope, "process.uptime") require.NotNil(t, metricRuntimeUptime) metricRuntimeUptimeDataType := metricRuntimeUptime.Data.(metricdata.Gauge[int64]) require.Len(t, metricRuntimeUptimeDataType.DataPoints, 1) @@ -2601,9 +2601,9 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processCpuUsageMetric, *getMetricByName(runtimeScope, "process.cpu.usage"), metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, processCpuUsageMetric, *integration.GetMetricByName(runtimeScope, "process.cpu.usage"), metricdatatest.IgnoreTimestamp()) - metricServerUptime := getMetricByName(runtimeScope, "server.uptime") + metricServerUptime := integration.GetMetricByName(runtimeScope, "server.uptime") require.NotNil(t, metricServerUptime) metricServerUptimeDataType := metricServerUptime.Data.(metricdata.Gauge[int64]) require.Len(t, metricServerUptimeDataType.DataPoints, 1) @@ -2649,7 +2649,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapAllocMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_alloc"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapAllocMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_alloc"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemHeapIdleMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.heap_idle", @@ -2672,7 +2672,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapIdleMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_idle"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapIdleMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_idle"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemHeapInUseMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.heap_inuse", @@ -2695,7 +2695,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapInUseMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_inuse"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapInUseMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_inuse"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemHeapObjectsMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.heap_objects", @@ -2718,7 +2718,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapObjectsMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapObjectsMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemHeapReleasedMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.heap_released", @@ -2741,7 +2741,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapReleasedMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_released"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapReleasedMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_released"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemHeapSysMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.heap_sys", @@ -2764,7 +2764,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemHeapSysMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_sys"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapSysMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.heap_sys"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoMemLiveObjectsMetric := metricdata.Metrics{ Name: "process.runtime.go.mem.live_objects", @@ -2787,7 +2787,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoMemLiveObjectsMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.live_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoMemLiveObjectsMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.mem.live_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoGcCountMetric := metricdata.Metrics{ Name: "process.runtime.go.gc.count", @@ -2810,7 +2810,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoGcCountMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoGcCountMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.gc.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoGoRoutinesCountMetric := metricdata.Metrics{ Name: "process.runtime.go.goroutines.count", @@ -2833,7 +2833,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoGoRoutinesCountMetric, *getMetricByName(runtimeScope, "process.runtime.go.goroutines.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoGoRoutinesCountMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.goroutines.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoInfoMetric := metricdata.Metrics{ Name: "process.runtime.go.info", @@ -2857,7 +2857,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoInfoMetric, *getMetricByName(runtimeScope, "process.runtime.go.info"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoInfoMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.info"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoGcPauseTotalMetric := metricdata.Metrics{ Name: "process.runtime.go.gc.pause_total", @@ -2880,7 +2880,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoGcPauseTotalMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.pause_total"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoGcPauseTotalMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.gc.pause_total"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) processRuntimeGoGcPauseMetric := metricdata.Metrics{ Name: "process.runtime.go.gc.pause", @@ -2894,7 +2894,7 @@ func TestRuntimeTelemetry(t *testing.T) { }, } - metricdatatest.AssertEqual(t, processRuntimeGoGcPauseMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.pause"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + metricdatatest.AssertEqual(t, processRuntimeGoGcPauseMetric, *integration.GetMetricByName(runtimeScope, "process.runtime.go.gc.pause"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) }) }) } @@ -7291,26 +7291,26 @@ func TestTelemetry(t *testing.T) { rdFiltered, ok := rmFiltered.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64]) require.True(t, ok) - assertAttributeNotInSet(t, rdFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, rdFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, rdFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) - assertAttributeNotInSet(t, rdFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, rdFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, rdFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, rdFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, rdFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) rclFiltered, ok := rmFiltered.ScopeMetrics[0].Metrics[1].Data.(metricdata.Sum[int64]) require.True(t, ok) - assertAttributeNotInSet(t, rclFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, rclFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, rclFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) - assertAttributeNotInSet(t, rclFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, rclFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, rclFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, rclFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, rclFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) resClFiltered, ok := rmFiltered.ScopeMetrics[0].Metrics[2].Data.(metricdata.Sum[int64]) require.True(t, ok) - assertAttributeNotInSet(t, resClFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, resClFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) - assertAttributeNotInSet(t, resClFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) - assertAttributeNotInSet(t, resClFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, resClFiltered.DataPoints[0].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, resClFiltered.DataPoints[1].Attributes, otel.WgClientName.String("unknown")) + integration.AssertAttributeNotInSet(t, resClFiltered.DataPoints[0].Attributes, otel.WgOperationName.String("")) + integration.AssertAttributeNotInSet(t, resClFiltered.DataPoints[1].Attributes, otel.WgOperationName.String("")) }) }) @@ -8740,29 +8740,5 @@ func TestTelemetry(t *testing.T) { }) }) }) -} - -func assertAttributeNotInSet(t *testing.T, set attribute.Set, attr attribute.KeyValue) { - t.Helper() - - _, ok := set.Value(attr.Key) - require.False(t, ok) -} - -func getMetricByName(scopeMetric *metricdata.ScopeMetrics, name string) *metricdata.Metrics { - for _, m := range scopeMetric.Metrics { - if m.Name == name { - return &m - } - } - return nil -} -func getMetricScopeByName(metrics []metricdata.ScopeMetrics, name string) *metricdata.ScopeMetrics { - for _, m := range metrics { - if m.Scope.Name == name { - return &m - } - } - return nil } diff --git a/router-tests/testenv/testdata/cache_warmup/single/employees.gql b/router-tests/testenv/testdata/cache_warmup/single/employees.gql new file mode 100644 index 0000000000..0c8b1f0f55 --- /dev/null +++ b/router-tests/testenv/testdata/cache_warmup/single/employees.gql @@ -0,0 +1 @@ +query { employees { id } } \ No newline at end of file diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index 39fd18fb31..011095b77a 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -319,7 +319,6 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) { t.Helper() var ( - metricReader *metric.ManualReader kafkaAdminClient *kadm.Client kafkaStarted sync.WaitGroup kafkaClient *kgo.Client @@ -370,8 +369,9 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) { } if cfg.AssertCacheMetrics != nil { - metricReader = metric.NewManualReader() - cfg.MetricReader = metricReader + if cfg.MetricReader == nil { + cfg.MetricReader = metric.NewManualReader() + } cfg.MetricOptions.EnableOTLPRouterCache = true } @@ -685,7 +685,7 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) { shutdown: atomic.NewBool(false), logObserver: logObserver, getPubSubName: getPubSubName, - metricReader: metricReader, + metricReader: cfg.MetricReader, Servers: []*httptest.Server{ employeesServer, familyServer, @@ -1061,7 +1061,7 @@ type Environment struct { routerConfigVersionMain string routerConfigVersionMyFF string - metricReader *metric.ManualReader + metricReader metric.Reader } func GetPubSubNameFn(prefix string) func(name string) string { diff --git a/router-tests/utils.go b/router-tests/utils.go index fa06efc2ab..9fda3aebce 100644 --- a/router-tests/utils.go +++ b/router-tests/utils.go @@ -5,6 +5,8 @@ import ( "github.com/stretchr/testify/require" "github.com/wundergraph/cosmo/router-tests/jwks" "github.com/wundergraph/cosmo/router/pkg/authentication" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/trace" tracetest2 "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" @@ -53,3 +55,28 @@ func configureAuth(t *testing.T) ([]authentication.Authenticator, *jwks.Server) require.NoError(t, err) return []authentication.Authenticator{authenticator}, authServer } + +func AssertAttributeNotInSet(t *testing.T, set attribute.Set, attr attribute.KeyValue) { + t.Helper() + + _, ok := set.Value(attr.Key) + require.False(t, ok) +} + +func GetMetricByName(scopeMetric *metricdata.ScopeMetrics, name string) *metricdata.Metrics { + for _, m := range scopeMetric.Metrics { + if m.Name == name { + return &m + } + } + return nil +} + +func GetMetricScopeByName(metrics []metricdata.ScopeMetrics, name string) *metricdata.ScopeMetrics { + for _, m := range metrics { + if m.Scope.Name == name { + return &m + } + } + return nil +} diff --git a/router/core/cache_warmup.go b/router/core/cache_warmup.go index d7621728b2..e21e0b571a 100644 --- a/router/core/cache_warmup.go +++ b/router/core/cache_warmup.go @@ -3,6 +3,7 @@ package core import ( "context" "errors" + "strconv" "time" "go.uber.org/ratelimit" @@ -27,7 +28,7 @@ type CacheWarmupSource interface { } type CacheWarmupProcessor interface { - ProcessOperation(ctx context.Context, item *nodev1.Operation) error + ProcessOperation(ctx context.Context, item *nodev1.Operation) (*CacheWarmupOperationPlanResult, error) } type CacheWarmupConfig struct { @@ -37,6 +38,7 @@ type CacheWarmupConfig struct { ItemsPerSecond int Timeout time.Duration Processor CacheWarmupProcessor + AfterOperation func(item *CacheWarmupOperationPlanResult) } func WarmupCaches(ctx context.Context, cfg *CacheWarmupConfig) (err error) { @@ -47,6 +49,7 @@ func WarmupCaches(ctx context.Context, cfg *CacheWarmupConfig) (err error) { itemsPerSecond: cfg.ItemsPerSecond, timeout: cfg.Timeout, processor: cfg.Processor, + afterOperation: cfg.AfterOperation, } if cfg.Workers < 1 { w.workers = 4 @@ -93,6 +96,7 @@ type cacheWarmup struct { itemsPerSecond int timeout time.Duration processor CacheWarmupProcessor + afterOperation func(item *CacheWarmupOperationPlanResult) } func (w *cacheWarmup) run(ctx context.Context) (int, error) { @@ -150,9 +154,10 @@ func (w *cacheWarmup) run(ctx context.Context) (int, error) { } rl.Take() item := items[idx] - err := w.processor.ProcessOperation(ctx, item) + + res, err := w.processor.ProcessOperation(ctx, item) if err != nil { - w.log.Error("Failed to process operation, skipping", + w.log.Warn("Failed to process operation, skipping", zap.Error(err), zap.String("client_name", item.Client.Name), zap.String("client_version", item.Client.Version), @@ -160,6 +165,11 @@ func (w *cacheWarmup) run(ctx context.Context) (int, error) { zap.String("operation_name", item.Request.OperationName), ) } + + if err == nil && w.afterOperation != nil { + w.afterOperation(res) + } + select { case <-done: return @@ -205,6 +215,15 @@ func NewCacheWarmupPlanningProcessor(options *CacheWarmupPlanningProcessorOption } } +type CacheWarmupOperationPlanResult struct { + OperationHash string + OperationName string + OperationType string + ClientName string + ClientVersion string + PlanningTime time.Duration +} + type CacheWarmupPlanningProcessor struct { operationProcessor *OperationProcessor operationPlanner *OperationPlanner @@ -213,7 +232,7 @@ type CacheWarmupPlanningProcessor struct { trackSchemaUsage bool } -func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, operation *nodev1.Operation) error { +func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, operation *nodev1.Operation) (*CacheWarmupOperationPlanResult, error) { var ( isAPQ bool @@ -221,14 +240,14 @@ func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, ope k, err := c.operationProcessor.NewIndependentKit() if err != nil { - return err + return nil, err } var s []byte if operation.Request.GetExtensions() != nil { s, err = protojson.Marshal(operation.Request.GetExtensions()) if err != nil { - return err + return nil, err } } @@ -248,44 +267,44 @@ func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, ope err = k.unmarshalOperation() if err != nil { - return err + return nil, err } err = k.ComputeOperationSha256() if err != nil { - return err + return nil, err } if k.parsedOperation.IsPersistedOperation { _, isAPQ, err = k.FetchPersistedOperation(ctx, item.Client) if err != nil { - return err + return nil, err } } err = k.Parse() if err != nil { - return err + return nil, err } _, err = k.NormalizeOperation(item.Client.Name, isAPQ) if err != nil { - return err + return nil, err } err = k.NormalizeVariables() if err != nil { - return err + return nil, err } err = k.RemapVariables() if err != nil { - return err + return nil, err } _, err = k.Validate(true, k.parsedOperation.RemapVariables) if err != nil { - return err + return nil, err } if c.complexityLimits != nil { @@ -316,13 +335,22 @@ func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, ope opContext.variables, err = astjson.ParseBytes(k.parsedOperation.Request.Variables) if err != nil { - return err + return nil, err } + planningStart := time.Now() + err = c.operationPlanner.plan(opContext, planOptions) if err != nil { - return err + return nil, err } - return nil + return &CacheWarmupOperationPlanResult{ + OperationHash: strconv.FormatUint(k.parsedOperation.ID, 10), + OperationName: k.parsedOperation.Request.OperationName, + OperationType: k.parsedOperation.Type, + ClientName: item.Client.Name, + ClientVersion: item.Client.Version, + PlanningTime: time.Since(planningStart), + }, nil } diff --git a/router/core/cache_warmup_test.go b/router/core/cache_warmup_test.go index e213ee085d..0b33866c2e 100644 --- a/router/core/cache_warmup_test.go +++ b/router/core/cache_warmup_test.go @@ -26,14 +26,21 @@ type CacheWarmupMockProcessor struct { mux sync.Mutex } -func (c *CacheWarmupMockProcessor) ProcessOperation(ctx context.Context, item *nodev1.Operation) error { +func (c *CacheWarmupMockProcessor) ProcessOperation(ctx context.Context, item *nodev1.Operation) (*CacheWarmupOperationPlanResult, error) { if c.err != nil { - return c.err + return nil, c.err } c.mux.Lock() defer c.mux.Unlock() c.processedItems = append(c.processedItems, item) - return nil + return &CacheWarmupOperationPlanResult{ + OperationHash: "", + OperationName: "", + OperationType: "", + ClientName: item.GetClient().Name, + ClientVersion: item.GetClient().Version, + PlanningTime: 0, + }, nil } type CacheWarmupProcessorError struct{} diff --git a/router/core/graph_server.go b/router/core/graph_server.go index a406006274..35236bfcdd 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "errors" "fmt" + otelmetric "go.opentelemetry.io/otel/metric" "net/http" "net/url" "strings" @@ -914,6 +915,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, RouterSchema: executor.RouterSchema, TrackSchemaUsage: s.graphqlMetricsConfig.Enabled, }) + warmupConfig := &CacheWarmupConfig{ Log: s.logger, Processor: processor, @@ -922,6 +924,24 @@ func (s *graphServer) buildGraphMux(ctx context.Context, Timeout: s.Config.cacheWarmup.Timeout, } + warmupConfig.AfterOperation = func(item *CacheWarmupOperationPlanResult) { + gm.metricStore.MeasureOperationPlanningTime(ctx, + item.PlanningTime, + nil, + otelmetric.WithAttributes( + append([]attribute.KeyValue{ + otel.WgOperationName.String(item.OperationName), + otel.WgClientName.String(item.ClientName), + otel.WgClientVersion.String(item.ClientVersion), + otel.WgFeatureFlag.String(featureFlagName), + otel.WgOperationHash.String(item.OperationHash), + otel.WgOperationType.String(item.OperationType), + otel.WgEnginePlanCacheHit.Bool(false), + }, baseMetricAttributes...)..., + ), + ) + } + if s.Config.cacheWarmup.Source.Filesystem != nil { warmupConfig.Source = NewFileSystemSource(&FileSystemSourceConfig{ RootPath: s.Config.cacheWarmup.Source.Filesystem.Path, diff --git a/router/core/graphql_prehandler.go b/router/core/graphql_prehandler.go index 946adaef37..9eca52b126 100644 --- a/router/core/graphql_prehandler.go +++ b/router/core/graphql_prehandler.go @@ -737,7 +737,6 @@ func (h *PreHandler) handleOperation(req *http.Request, variablesParser *astjson httpOperation.requestLogger.Error("failed to plan operation", zap.Error(err)) rtrace.AttachErrToSpan(enginePlanSpan, err) - requestContext.operation.planningTime = time.Since(startPlanning) if !requestContext.operation.traceOptions.ExcludePlannerStats { httpOperation.traceTimings.EndPlanning() } diff --git a/studio/src/components/cache/operations-table.tsx b/studio/src/components/cache/operations-table.tsx index 1b69555029..72fab4c0d8 100644 --- a/studio/src/components/cache/operations-table.tsx +++ b/studio/src/components/cache/operations-table.tsx @@ -14,9 +14,10 @@ import { TableWrapper, } from "../ui/table"; import Link from "next/link"; -import { use, useContext } from "react"; +import { useContext } from "react"; import { GraphContext } from "../layout/graph-layout"; import { useUser } from "@/hooks/use-user"; +import { nanoTimestampToTime } from "@/components/analytics/charts"; export const CacheOperationsTable = ({ operations, @@ -46,7 +47,7 @@ export const CacheOperationsTable = ({ Actor Is Persisted Is Manually Added - Planning Time P90 (ms) + Planning Time P90 Details @@ -94,7 +95,9 @@ export const CacheOperationsTable = ({ - {planningTime || "-"} + {planningTime + ? nanoTimestampToTime(planningTime * 1000000) + : "-"} { - {lastComputedAt && ( - {`Last computed ${formatDistanceToNow( - new Date(lastComputedAt), - { - addSuffix: true, - }, - )}`} - )} { + + + + + Total Items + + + + {data.totalCount} + + + + + + Last Computed + + + + + {" "} + {lastComputedAt && ( + + {formatDistanceToNow(new Date(lastComputedAt), { + addSuffix: true, + })} + + )} + + + +
{`Last computed ${formatDistanceToNow( - new Date(lastComputedAt), - { - addSuffix: true, - }, - )}`}
+ {formatDistanceToNow(new Date(lastComputedAt), { + addSuffix: true, + })} +