Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions router-tests/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4864,6 +4864,134 @@ func TestFlakyPrometheusRouterConnectionMetrics(t *testing.T) {
})
})

t.Run("validate recording connection stats for subscriptions", func(t *testing.T) {
t.Parallel()

promRegistry := prometheus.NewRegistry()
metricReader := metric.NewManualReader()
testenv.Run(t, &testenv.Config{
MetricReader: metricReader,
PrometheusRegistry: promRegistry,
MetricOptions: testenv.MetricOptions{
EnablePrometheusConnectionMetrics: true,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
defer func() {
_ = conn.Close()
}()

err := conn.WriteJSON(&testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { countEmp2(max: 2, intervalMilliseconds: 100) }"}`),
})
require.NoError(t, err)

var msg testenv.WebSocketMessage
err = conn.ReadJSON(&msg)
require.NoError(t, err)
require.Equal(t, "1", msg.ID)
require.Equal(t, "next", msg.Type)
require.Equal(t, `{"data":{"countEmp2":0}}`, string(msg.Payload))

rm := metricdata.ResourceMetrics{}
err = metricReader.Collect(context.Background(), &rm)
require.NoError(t, err)

mf, err := promRegistry.Gather()
require.NoError(t, err)

t.Run("verify max connections", func(t *testing.T) {
metricFamily := findMetricFamilyByName(mf, "router_http_client_max_connections")

metrics := metricFamily.GetMetric()
require.Len(t, metrics, 1)

connectionTotal := metrics[0]

require.Equal(t, 100.0, *connectionTotal.Gauge.Value)

expected := []*io_prometheus_client.LabelPair{
{
Name: PointerOf("otel_scope_name"),
Value: PointerOf("cosmo.router.connections.prometheus"),
},
{
Name: PointerOf("otel_scope_version"),
Value: PointerOf("0.0.1"),
},
}
require.Equal(t, expected, connectionTotal.Label)

})

t.Run("verify connections active", func(t *testing.T) {
metricFamily := findMetricFamilyByName(mf, "router_http_client_active_connections")
metrics := metricFamily.GetMetric()
require.Len(t, metrics, 1)

metricDataPoint1 := metrics[0]
require.Greater(t, *metricDataPoint1.Gauge.Value, 0.0)
expected1 := []*io_prometheus_client.LabelPair{
{
Name: PointerOf("otel_scope_name"),
Value: PointerOf("cosmo.router.connections.prometheus"),
},
{
Name: PointerOf("otel_scope_version"),
Value: PointerOf("0.0.1"),
},
{
Name: PointerOf("server_address"),
Value: PointerOf("127.0.0.1"),
},
{
Name: PointerOf("server_port"),
Value: PointerOf(getPort(metricDataPoint1)),
},
}
require.Equal(t, expected1, metricDataPoint1.Label)
})

t.Run("verify connection total duration", func(t *testing.T) {
metricFamily := findMetricFamilyByName(mf, "router_http_client_connection_acquire_duration")
metrics := metricFamily.GetMetric()
require.Len(t, metrics, 1)

metricDataPoint1 := metrics[0]
require.Greater(t, *metricDataPoint1.Histogram.SampleSum, 0.0)
expected1 := []*io_prometheus_client.LabelPair{
{
Name: PointerOf("otel_scope_name"),
Value: PointerOf("cosmo.router.connections.prometheus"),
},
{
Name: PointerOf("otel_scope_version"),
Value: PointerOf("0.0.1"),
},
{
Name: PointerOf("server_address"),
Value: PointerOf("127.0.0.1"),
},
{
Name: PointerOf("server_port"),
Value: PointerOf(getPort(metricDataPoint1)),
},
{
Name: PointerOf("wg_http_client_reused_connection"),
Value: PointerOf("false"),
},
{
Name: PointerOf("wg_subgraph_name"),
Value: PointerOf("employees"),
},
}
require.Equal(t, expected1, metricDataPoint1.Label)
})
})
})

}

func TestExcludeAttributesWithCustomExporterPrometheus(t *testing.T) {
Expand Down
105 changes: 105 additions & 0 deletions router-tests/telemetry/connection_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,110 @@ func TestConnectionMetrics(t *testing.T) {
})
})
})

t.Run("validate recording connection stats for subscriptions", func(t *testing.T) {
t.Parallel()

metricReader := metric.NewManualReader()
testenv.Run(t, &testenv.Config{
MetricReader: metricReader,
MetricOptions: testenv.MetricOptions{
EnableOTLPConnectionMetrics: true,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil)
defer func() {
_ = conn.Close()
}()

err := conn.WriteJSON(&testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { countEmp2(max: 2, intervalMilliseconds: 100) }"}`),
})
require.NoError(t, err)

var msg testenv.WebSocketMessage
err = conn.ReadJSON(&msg)
require.NoError(t, err)
require.Equal(t, "1", msg.ID)
require.Equal(t, "next", msg.Type)
require.Equal(t, `{"data":{"countEmp2":0}}`, string(msg.Payload))

rm := metricdata.ResourceMetrics{}
err = metricReader.Collect(context.Background(), &rm)
require.NoError(t, err)

scopeMetric := *integration.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router.connections")
excludePortFromMetrics(t, rm.ScopeMetrics)

require.Len(t, scopeMetric.Metrics, 3)

t.Run("verify max connections", func(t *testing.T) {
expected := metricdata.Metrics{
Name: "router.http.client.max_connections",
Description: "Total number of max connections per subgraph",
Unit: "",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{},
},
},
}
metricdatatest.AssertEqual(t, expected, scopeMetric.Metrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
})

t.Run("verify connections active", func(t *testing.T) {
metrics := scopeMetric.Metrics[2]

expected := metricdata.Metrics{
Name: "router.http.client.active_connections",
Description: "Connections active",
Unit: "",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otel.ServerAddress.String("127.0.0.1"),
),
Value: 1,
},
},
},
}

metricdatatest.AssertEqual(t, expected, metrics, metricdatatest.IgnoreTimestamp())
})

t.Run("verify connection total duration", func(t *testing.T) {
metrics := scopeMetric.Metrics[1]

actualHistogram, ok := metrics.Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.Greater(t, actualHistogram.DataPoints[0].Sum, 0.0)

expected := metricdata.Metrics{
Name: "router.http.client.connection.acquire_duration",
Description: "Total connection acquire duration",
Unit: "ms",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otel.ServerAddress.String("127.0.0.1"),
otel.WgClientReusedConnection.Bool(false),
otel.WgSubgraphName.String("employees"),
),
},
},
},
}

metricdatatest.AssertEqual(t, expected, metrics, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
})
})
})
}

// Checking for the port introduced flakiness in the tests, as we cannot really map the correct port to the datapoint.
Expand Down Expand Up @@ -270,4 +374,5 @@ func excludePortFromMetrics(t *testing.T, scopeMetrics []metricdata.ScopeMetrics
}
}
}

}
2 changes: 2 additions & 0 deletions router/core/router_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (c *Config) Usage() map[string]any {
usage["metrics_otel_engine_stats_enabled"] = c.metricConfig.OpenTelemetry.EngineStats.Enabled()
usage["metrics_otel_graphql_cache"] = c.metricConfig.OpenTelemetry.GraphqlCache
usage["metrics_otel_router_runtime"] = c.metricConfig.OpenTelemetry.RouterRuntime
usage["metrics_otel_connection_stats"] = c.metricConfig.OpenTelemetry.ConnectionStats
}
usage["metrics_prometheus_enabled"] = c.metricConfig.Prometheus.Enabled
if c.metricConfig.Prometheus.Enabled {
Expand All @@ -195,6 +196,7 @@ func (c *Config) Usage() map[string]any {
usage["metrics_prometheus_exclude_metrics_labels"] = c.metricConfig.Prometheus.ExcludeMetricLabels
usage["metrics_prometheus_exclude_scope_info"] = c.metricConfig.Prometheus.ExcludeScopeInfo
usage["metrics_prometheus_schema_field_usage_enabled"] = c.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled
usage["metrics_prometheus_connection_stats"] = c.metricConfig.Prometheus.ConnectionStats
}
}

Expand Down
50 changes: 43 additions & 7 deletions router/core/trace_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package core

import (
"context"
"errors"
"github.com/wundergraph/cosmo/router/pkg/metric"
"net"
"syscall"
)

type TraceDialer struct {
Expand All @@ -16,9 +18,9 @@ func NewTraceDialer() *TraceDialer {
}
}

type dialerFunc func(ctx context.Context, network, address string) (net.Conn, error)
type DialerFunc func(ctx context.Context, network, address string) (net.Conn, error)

func (t *TraceDialer) WrapDial(base dialerFunc, subgraph string) dialerFunc {
func (t *TraceDialer) WrapDial(base DialerFunc, subgraph string) DialerFunc {
return func(ctx context.Context, network, address string) (net.Conn, error) {
key := metric.SubgraphHostKey{
Subgraph: subgraph,
Expand All @@ -34,12 +36,20 @@ func (t *TraceDialer) WrapDial(base dialerFunc, subgraph string) dialerFunc {
return conn, err
}

// wrap conn to decrement on Close
onClose := func() {
counter.Add(-1)
}

if _, ok := conn.(syscall.Conn); ok {
return &trackedConnWithSyscall{
Conn: conn,
onClose: onClose,
}, nil
}

return &trackedConn{
Conn: conn,
onClose: func() {
counter.Add(-1)
},
Conn: conn,
onClose: onClose,
}, nil
}
}
Expand All @@ -56,3 +66,29 @@ func (c *trackedConn) Close() error {
}
return err
}

// We duplicate the trackedConn here to also implement syscall.Conn
// we do this instead of using type assertion on trackedConn
// because that would result in trackedConn being incorrectly typed
// and would pass type assertions for syscall.Conn
type trackedConnWithSyscall struct {
net.Conn
onClose func()
}

func (c *trackedConnWithSyscall) Close() error {
err := c.Conn.Close()
if c.onClose != nil {
c.onClose()
}
return err
}

func (c *trackedConnWithSyscall) SyscallConn() (syscall.RawConn, error) {
if sc, ok := c.Conn.(syscall.Conn); ok {
return sc.SyscallConn()
}
// This should not really happen because we check the type before
// using trackedConnWithSyscall
return nil, errors.New("underlying conn doesn't implement syscall.Conn")
}
13 changes: 9 additions & 4 deletions router/core/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ func NewCustomTransport(
}

if enableTraceClient {
getExprContext := func(ctx context.Context) *expr.Context {
getValuesFromRequest := func(ctx context.Context, req *http.Request) (*expr.Context, string) {
reqContext := getRequestContext(ctx)
if reqContext == nil {
return &expr.Context{}
return &expr.Context{}, ""
}
return &reqContext.expressionContext

var activeSubgraphName string
if activeSubgraph := reqContext.ActiveSubgraph(req); activeSubgraph != nil {
activeSubgraphName = activeSubgraph.Name
}
return &reqContext.expressionContext, activeSubgraphName
}
baseRoundTripper = traceclient.NewTraceInjectingRoundTripper(baseRoundTripper, connectionMetricStore, getExprContext)
baseRoundTripper = traceclient.NewTraceInjectingRoundTripper(baseRoundTripper, connectionMetricStore, getValuesFromRequest)
}

if breaker.HasCircuits() {
Expand Down
Loading
Loading