Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions config/arch_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,27 @@ properties:
type: integer
trace_arch_internal:
type: boolean
custom_attributes:
type: array
items:
type: object
properties:
key:
type: string
type:
type: string
enum:
- str
- bool
- float
- int
header:
type: string
additionalProperties: false
required:
- key
- type
- header
additionalProperties: false
mode:
type: string
Expand Down
21 changes: 20 additions & 1 deletion crates/brightstaff/src/handlers/agent_chat_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::{Instant, SystemTime};

use bytes::Bytes;
use common::configuration::Tracing;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
use hermesllm::apis::OpenAIMessage;
Expand All @@ -18,7 +19,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{http, operation_component, OperationNameBuilder};
use crate::tracing::{
extract_custom_trace_attributes, http, operation_component, OperationNameBuilder,
};

/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
Expand All @@ -42,13 +45,15 @@ pub async fn agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(
request,
orchestrator_service,
agents_list,
listeners,
trace_collector,
tracing_config,
)
.await
{
Expand Down Expand Up @@ -127,6 +132,7 @@ async fn handle_agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
Expand Down Expand Up @@ -176,6 +182,13 @@ async fn handle_agent_chat(

headers
};
let custom_attrs = extract_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.custom_attributes.as_deref()),
);

let chat_request_bytes = request.collect().await?.to_bytes();

Expand Down Expand Up @@ -269,6 +282,9 @@ async fn handle_agent_chat(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
);
for (key, value) in &custom_attrs {
selection_span_builder = selection_span_builder.with_attribute(key, value);
}

if !trace_id.is_empty() {
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
Expand Down Expand Up @@ -359,6 +375,9 @@ async fn handle_agent_chat(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
);
for (key, value) in &custom_attrs {
span_builder = span_builder.with_attribute(key, value);
}

if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id.clone());
Expand Down
21 changes: 19 additions & 2 deletions crates/brightstaff/src/handlers/llm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use common::configuration::{LlmProvider, ModelAlias};
use common::configuration::{LlmProvider, ModelAlias, Tracing};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
Expand All @@ -25,25 +25,35 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::tracing::operation_component;
use crate::tracing::{extract_custom_trace_attributes, operation_component};

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

// ! we reached the limit of the number of arguments for a function
#[allow(clippy::too_many_arguments)]
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
full_qualified_llm_provider_url: String,
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<Vec<LlmProvider>>>,
trace_collector: Arc<TraceCollector>,
tracing_config: Arc<Option<Tracing>>, // ! right here
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let request_headers = request.headers().clone();
let custom_attrs = extract_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.custom_attributes.as_deref()),
);
let request_id: String = match request_headers
.get(REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok())
Expand Down Expand Up @@ -229,6 +239,7 @@ pub async fn llm_chat(
&traceparent,
&request_path,
&request_id,
&custom_attrs,
)
.await
{
Expand Down Expand Up @@ -304,6 +315,7 @@ pub async fn llm_chat(
user_message_preview,
temperature,
&llm_providers,
&custom_attrs,
)
.await;

Expand Down Expand Up @@ -390,6 +402,7 @@ async fn build_llm_span(
user_message_preview: Option<String>,
temperature: Option<f32>,
llm_providers: &Arc<RwLock<Vec<LlmProvider>>>,
custom_attrs: &HashMap<String, String>,
) -> common::traces::Span {
use crate::tracing::{http, llm, OperationNameBuilder};
use common::traces::{parse_traceparent, SpanBuilder, SpanKind};
Expand Down Expand Up @@ -455,6 +468,10 @@ async fn build_llm_span(
span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview);
}

for (key, value) in custom_attrs {
span_builder = span_builder.with_attribute(key, value);
}

span_builder.build()
}

Expand Down
10 changes: 10 additions & 0 deletions crates/brightstaff/src/handlers/router_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn router_chat_get_upstream_model(
traceparent: &str,
request_path: &str,
request_id: &str,
custom_attrs: &HashMap<String, String>,
) -> Result<RoutingResult, RoutingError> {
// Clone metadata for routing before converting (which consumes client_request)
let routing_metadata = client_request.metadata().clone();
Expand Down Expand Up @@ -139,6 +140,9 @@ pub async fn router_chat_get_upstream_model(
// Record successful routing span
let mut attrs: HashMap<String, String> = HashMap::new();
attrs.insert("route.selected_model".to_string(), model_name.clone());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand All @@ -161,6 +165,9 @@ pub async fn router_chat_get_upstream_model(
let default_model = chat_request.model.clone();
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), default_model.clone());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand All @@ -180,6 +187,9 @@ pub async fn router_chat_get_upstream_model(
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "unknown".to_string());
attrs.insert("error.message".to_string(), err.to_string());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand Down
5 changes: 5 additions & 0 deletions crates/brightstaff/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
));

let model_aliases = Arc::new(arch_config.model_aliases.clone());
let tracing_config = Arc::new(arch_config.tracing.clone());

// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in arch_config.yaml
Expand Down Expand Up @@ -172,6 +173,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
Expand All @@ -183,6 +185,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let state_storage = state_storage.clone();

async move {
Expand All @@ -203,6 +206,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
agents_list,
listeners,
trace_collector,
tracing_config,
)
.with_context(parent_cx)
.await;
Expand All @@ -221,6 +225,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
model_aliases,
llm_providers,
trace_collector,
tracing_config,
state_storage,
)
.with_context(parent_cx)
Expand Down
119 changes: 119 additions & 0 deletions crates/brightstaff/src/tracing/custom_attributes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::collections::HashMap;

use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
use hyper::header::{HeaderMap, HeaderName};

pub fn extract_custom_trace_attributes(
headers: &HeaderMap,
custom_attributes: Option<&[CustomTraceAttribute]>,
) -> HashMap<String, String> {
let mut attributes = HashMap::new();
let Some(custom_attributes) = custom_attributes else {
return attributes;
};

for attribute in custom_attributes {
// Normalize/validate the configured header name; skip invalid names.
let header_name = match HeaderName::from_bytes(attribute.header.as_bytes()) {
Ok(name) => name,
Err(_) => continue,
};

// Extract header value as UTF-8 text; skip missing or invalid values.
let raw_value = match headers
.get(header_name)
.and_then(|value| value.to_str().ok())
{
Some(value) => value.trim(),
None => continue,
};

// Parse the header value according to the configured type.
let parsed_value = match attribute.value_type {
CustomTraceAttributeType::Str => Some(raw_value.to_string()),
CustomTraceAttributeType::Bool => raw_value.parse::<bool>().ok().map(|v| v.to_string()),
CustomTraceAttributeType::Float => raw_value.parse::<f64>().ok().map(|v| v.to_string()),
CustomTraceAttributeType::Int => raw_value.parse::<i64>().ok().map(|v| v.to_string()),
};

// Only include attributes that successfully parsed.
if let Some(value) = parsed_value {
attributes.insert(attribute.key.clone(), value);
}
}

attributes
}

#[cfg(test)]
mod tests {
use super::extract_custom_trace_attributes;
use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
use hyper::header::{HeaderMap, HeaderValue};

#[test]
fn extracts_and_parses_custom_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-workspace-id", HeaderValue::from_static("ws_123"));
headers.insert("x-tenant-id", HeaderValue::from_static("ten_456"));
headers.insert("x-user-id", HeaderValue::from_static("usr_789"));
headers.insert("x-admin-level", HeaderValue::from_static("3"));
headers.insert("x-is-internal", HeaderValue::from_static("true"));
headers.insert("x-budget", HeaderValue::from_static("42.5"));
headers.insert("x-bad-int", HeaderValue::from_static("nope"));

let custom_attributes = vec![
CustomTraceAttribute {
key: "workspace.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-workspace-id".to_string(),
},
CustomTraceAttribute {
key: "tenant.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-tenant-id".to_string(),
},
CustomTraceAttribute {
key: "user.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-user-id".to_string(),
},
CustomTraceAttribute {
key: "admin.level".to_string(),
value_type: CustomTraceAttributeType::Int,
header: "x-admin-level".to_string(),
},
CustomTraceAttribute {
key: "is.internal".to_string(),
value_type: CustomTraceAttributeType::Bool,
header: "x-is-internal".to_string(),
},
CustomTraceAttribute {
key: "budget.value".to_string(),
value_type: CustomTraceAttributeType::Float,
header: "x-budget".to_string(),
},
CustomTraceAttribute {
key: "bad.int".to_string(),
value_type: CustomTraceAttributeType::Int,
header: "x-bad-int".to_string(),
},
CustomTraceAttribute {
key: "missing.header".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-missing".to_string(),
},
];

let attrs = extract_custom_trace_attributes(&headers, Some(&custom_attributes));

assert_eq!(attrs.get("workspace.id"), Some(&"ws_123".to_string()));
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
assert_eq!(attrs.get("is.internal"), Some(&"true".to_string()));
assert_eq!(attrs.get("budget.value"), Some(&"42.5".to_string()));
assert!(!attrs.contains_key("bad.int"));
assert!(!attrs.contains_key("missing.header"));
}
}
2 changes: 2 additions & 0 deletions crates/brightstaff/src/tracing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod constants;
mod custom_attributes;

pub use constants::{
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
};
pub use custom_attributes::extract_custom_trace_attributes;
Loading