Skip to content

feat: add llmobs proxy paths to trace agent #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 31, 2025
118 changes: 117 additions & 1 deletion bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const DSM_ENDPOINT_PATH: &str = "/api/v0.1/pipeline_stats";
const DSM_AGENT_PATH: &str = "/v0.1/pipeline_stats";
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
const PROFILING_BACKEND_PATH: &str = "/api/v2/profile";
const LLM_OBS_SPANS_INTAKE_PATH: &str = "/api/v2/llmobs";
const LLM_OBS_EVAL_METRIC_INTAKE_PATH: &str = "/api/intake/llm-obs/v1/eval-metric";
const LLM_OBS_EVAL_METRIC_INTAKE_PATH_V2: &str = "/api/intake/llm-obs/v2/eval-metric";
const LLM_OBS_EVAL_METRIC_ENDPOINT_PATH: &str = "/evp_proxy/v2/api/intake/llm-obs/v1/eval-metric";
const LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2: &str =
"/evp_proxy/v2/api/intake/llm-obs/v2/eval-metric";
const LLM_OBS_SPANS_ENDPOINT_PATH: &str = "/evp_proxy/v2/api/v2/llmobs";
const DD_ADDITIONAL_TAGS_HEADER: &str = "X-Datadog-Additional-Tags";
const INFO_ENDPOINT_PATH: &str = "/info";
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
Expand Down Expand Up @@ -181,6 +188,7 @@ impl TraceAgent {
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::pedantic)]
async fn trace_endpoint_handler(
config: Arc<config::Config>,
req: Request<Body>,
Expand Down Expand Up @@ -257,6 +265,51 @@ impl TraceAgent {
),
}
}
(&Method::POST, LLM_OBS_EVAL_METRIC_ENDPOINT_PATH) => {
match Self::handle_llm_obs_eval_metric_proxy(
config,
tags_provider,
api_key,
client,
req,
)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Eval Metric endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(&Method::POST, LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2) => {
match Self::handle_llm_obs_eval_metric_proxy_v2(
config,
tags_provider,
api_key,
client,
req,
)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Eval Metric endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(&Method::POST, LLM_OBS_SPANS_ENDPOINT_PATH) => {
match Self::handle_llm_obs_spans_proxy(config, tags_provider, api_key, client, req)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Spans endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler() {
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
Expand Down Expand Up @@ -354,7 +407,10 @@ impl TraceAgent {
STATS_ENDPOINT_PATH,
DSM_AGENT_PATH,
PROFILING_ENDPOINT_PATH,
INFO_ENDPOINT_PATH
INFO_ENDPOINT_PATH,
LLM_OBS_EVAL_METRIC_ENDPOINT_PATH,
LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2,
LLM_OBS_SPANS_ENDPOINT_PATH,
],
"client_drop_p0s": true,
}
Expand Down Expand Up @@ -490,6 +546,66 @@ impl TraceAgent {
.await
}

async fn handle_llm_obs_eval_metric_proxy(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"api",
LLM_OBS_EVAL_METRIC_INTAKE_PATH,
"llm_obs_eval_metric",
)
.await
}

async fn handle_llm_obs_eval_metric_proxy_v2(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"api",
LLM_OBS_EVAL_METRIC_INTAKE_PATH_V2,
"llm_obs_eval_metric",
)
.await
}

async fn handle_llm_obs_spans_proxy(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"llmobs-intake",
LLM_OBS_SPANS_INTAKE_PATH,
"llm_obs_spans",
)
.await
}

#[must_use]
pub fn get_sender_copy(&self) -> Sender<SendData> {
self.tx.clone()
Expand Down
Loading