Skip to content

fix: propagate client name and version modifications through telemetry #7369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changesets/fix_caroline_client_header.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Propagate client name and version modifications through telemetry ([PR #7369](https://github.com/apollographql/router/pull/7369))

The router accepts modifications to the client name and version (`apollo::telemetry::client_name` and `apollo::telemetry::client_version`), but those modifications are not currently propagated through the telemetry layers to update spans and traces.

This PR moves where the client name and version are bound to the span, so that the modifications from plugins **on the `router` service** are propagated.

By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7369
32 changes: 21 additions & 11 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,13 @@ impl PluginPrivate for Telemetry {
.attributes
.on_request(request);

custom_attributes.extend([
KeyValue::new(CLIENT_NAME_KEY, client_name.unwrap_or("").to_string()),
KeyValue::new(CLIENT_VERSION_KEY, client_version.unwrap_or("").to_string()),
KeyValue::new(
Key::from_static_str("apollo_private.http.request_headers"),
filter_headers(
request.router_request.headers(),
&config_request.apollo.send_headers,
),
custom_attributes.push(KeyValue::new(
Key::from_static_str("apollo_private.http.request_headers"),
filter_headers(
request.router_request.headers(),
&config_request.apollo.send_headers,
),
]);
));

let custom_instruments: RouterInstruments = config_request
.instrumentation
Expand All @@ -466,7 +462,7 @@ impl PluginPrivate for Telemetry {
request.context.clone(),
)
},
move |(custom_attributes, custom_instruments, mut custom_events, ctx): (
move |(mut custom_attributes, custom_instruments, mut custom_events, ctx): (
Vec<KeyValue>,
RouterInstruments,
RouterEvents,
Expand All @@ -480,6 +476,20 @@ impl PluginPrivate for Telemetry {
Self::plugin_metrics(&config);

async move {
// NB: client name and version must be picked up here, rather than in the
// `req_fn` of this `map_future_with_request_data` call, to allow plugins
// at the router service to modify the name and version.
let get_from_context =
|ctx: &Context, key| ctx.get::<&str, String>(key).ok().flatten();
let client_name = get_from_context(&ctx, CLIENT_NAME)
.or_else(|| get_from_context(&ctx, DEPRECATED_CLIENT_NAME));
let client_version = get_from_context(&ctx, CLIENT_VERSION)
.or_else(|| get_from_context(&ctx, DEPRECATED_CLIENT_VERSION));
custom_attributes.extend([
KeyValue::new(CLIENT_NAME_KEY, client_name.unwrap_or_default()),
KeyValue::new(CLIENT_VERSION_KEY, client_version.unwrap_or_default()),
]);

let span = Span::current();
span.set_span_dyn_attributes(custom_attributes);
let response: Result<router::Response, BoxError> = fut.await;
Expand Down
47 changes: 46 additions & 1 deletion apollo-router/tests/integration/telemetry/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,36 @@ async fn test_resources() -> Result<(), BoxError> {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_attributes() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
}
let mut router = IntegrationTest::builder()
.telemetry(Telemetry::Datadog)
.config(include_str!("fixtures/datadog.router.yaml"))
.build()
.await;

router.start().await;
router.assert_started().await;

TraceSpec::builder()
.services(["client", "router", "subgraph"].into())
.attribute("client.name", "foo")
.build()
.validate_datadog_trace(
&mut router,
Query::builder()
.traced(true)
.header("apollographql-client-name", "foo")
.build(),
)
.await?;
router.graceful_shutdown().await;
Ok(())
}

struct DatadogTraceSpec {
trace_spec: TraceSpec,
}
Expand Down Expand Up @@ -1162,7 +1192,22 @@ impl Verifier for DatadogTraceSpec {
Ok(())
}

fn verify_span_attributes(&self, _trace: &Value) -> Result<(), BoxError> {
fn verify_span_attributes(&self, trace: &Value) -> Result<(), BoxError> {
for (key, value) in self.attributes.iter() {
// extracts a list of span attribute values with the provided key
let binding = trace.select_path(&format!("$..meta..['{key}']"))?;
let matches_value = binding.iter().any(|v| match v {
Value::Bool(v) => (*v).to_string() == *value,
Value::Number(n) => (*n).to_string() == *value,
Value::String(s) => s == value,
_ => false,
});
if !matches_value {
return Err(BoxError::from(format!(
"unexpected attribute values for key `{key}`, expected value `{value}` but got {binding:?}"
)));
}
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
rhai:
scripts: "tests/integration/telemetry/fixtures"
main: "override_client_name.rhai"
telemetry:
instrumentation:
spans:
mode: spec_compliant
events:
router:
request: info
response: info
error: info
exporters:
tracing:
common:
service_name: router
otlp:
enabled: true
protocol: http
endpoint: <otel-collector-endpoint>
batch_processor:
scheduled_delay: 10ms
metrics:
common:
service_name: router
otlp:
enabled: true
endpoint: <otel-collector-endpoint>/metrics
protocol: http
batch_processor:
scheduled_delay: 10ms
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn router_service(service) {
const request_callback = Fn("process_request");
service.map_request(request_callback);
}

fn process_request(request) {
request.context["apollo::telemetry::client_name"] = "foo";
}
3 changes: 3 additions & 0 deletions apollo-router/tests/integration/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct TraceSpec {
subgraph_sampled: Option<bool>,
trace_id: Option<String>,
resources: HashMap<&'static str, &'static str>,
attributes: HashMap<&'static str, &'static str>,
}

#[buildstructor::buildstructor]
Expand All @@ -41,6 +42,7 @@ impl TraceSpec {
subgraph_sampled: Option<bool>,
trace_id: Option<String>,
resources: HashMap<&'static str, &'static str>,
attributes: HashMap<&'static str, &'static str>,
) -> Self {
Self {
operation_name,
Expand All @@ -53,6 +55,7 @@ impl TraceSpec {
subgraph_sampled,
trace_id,
resources,
attributes,
}
}
}
Expand Down
102 changes: 98 additions & 4 deletions apollo-router/tests/integration/telemetry/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,83 @@ async fn test_priority_sampling_no_parent_propagated() -> Result<(), BoxError> {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_attributes() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
}
let mock_server = mock_otlp_server(1..).await;
let config = include_str!("fixtures/otlp.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
let mut router = IntegrationTest::builder()
.telemetry(Telemetry::Otlp {
endpoint: Some(format!("{}/v1/traces", mock_server.uri())),
})
.config(config)
.build()
.await;

router.start().await;
router.assert_started().await;

TraceSpec::builder()
.services(["client", "router", "subgraph"].into())
.attribute("client.name", "foobar")
.build()
.validate_otlp_trace(
&mut router,
&mock_server,
Query::builder()
.traced(true)
.header("apollographql-client-name", "foobar")
.build(),
)
.await?;
router.graceful_shutdown().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_plugin_overridden_client_name_is_included_in_telemetry() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
}
let mock_server = mock_otlp_server(1..).await;
let config = include_str!("fixtures/otlp_override_client_name.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
let mut router = IntegrationTest::builder()
.telemetry(Telemetry::Otlp {
endpoint: Some(format!("{}/v1/traces", mock_server.uri())),
})
.config(config)
.build()
.await;

router.start().await;
router.assert_started().await;

// rhai script overrides client.name - no matter what client name we pass via headers, it should
// end up equalling the value set in the script (`foo`)
for header_value in [None, Some(""), Some("foo"), Some("bar")] {
let mut headers = HashMap::default();
if let Some(value) = header_value {
headers.insert("apollographql-client-name".to_string(), value.to_string());
}

let query = Query::builder().traced(true).headers(headers).build();
TraceSpec::builder()
.services(["client", "router", "subgraph"].into())
.attribute("client.name", "foo")
.build()
.validate_otlp_trace(&mut router, &mock_server, query)
.await
.unwrap_or_else(|_| panic!("Failed with header value {header_value:?}"));
}

router.graceful_shutdown().await;
Ok(())
}

struct OtlpTraceSpec<'a> {
trace_spec: TraceSpec,
mock_server: &'a MockServer,
Expand All @@ -711,10 +788,6 @@ impl Deref for OtlpTraceSpec<'_> {
}

impl Verifier for OtlpTraceSpec<'_> {
fn verify_span_attributes(&self, _span: &Value) -> Result<(), BoxError> {
// TODO
Ok(())
}
fn spec(&self) -> &TraceSpec {
&self.trace_spec
}
Expand Down Expand Up @@ -947,6 +1020,27 @@ impl Verifier for OtlpTraceSpec<'_> {
}
Ok(())
}

fn verify_span_attributes(&self, trace: &Value) -> Result<(), BoxError> {
for (key, value) in self.attributes.iter() {
// extracts a list of span attribute values with the provided key
let binding = trace.select_path(&format!(
"$..spans..attributes..[?(@.key == '{key}')].value.*"
))?;
let matches_value = binding.iter().any(|v| match v {
Value::Bool(v) => (*v).to_string() == *value,
Value::Number(n) => (*n).to_string() == *value,
Value::String(s) => s == value,
_ => false,
});
if !matches_value {
return Err(BoxError::from(format!(
"unexpected attribute values for key `{key}`, expected value `{value}` but got {binding:?}"
)));
}
}
Ok(())
}
}

async fn mock_otlp_server_delayed() -> MockServer {
Expand Down