Skip to content

chore(llmobs): support running experiment evals #13994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@
LITELLM_ROUTER_INSTANCE_KEY = "_dd.router_instance"

PROXY_REQUEST = "llmobs.proxy_request"

EXPERIMENT_ID_KEY = "_ml_obs.experiment_id"
EXPERIMENT_EXPECTED_OUTPUT_KEY = "_ml_obs.meta.input.expected_output"
164 changes: 151 additions & 13 deletions ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
import sys
import traceback
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypedDict
from typing import Union
from typing import cast

from typing_extensions import NotRequired

from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
from ddtrace.constants import ERROR_TYPE
from ddtrace.internal.logger import get_logger


if TYPE_CHECKING:
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._writer import LLMObsExperimentsClient


logger = get_logger(__name__)

JSONType = Union[str, int, float, bool, None, List["JSONType"], Dict[str, "JSONType"]]
NonNoneJSONType = Union[str, int, float, bool, List[JSONType], Dict[str, JSONType]]


class DatasetRecord(TypedDict):
input_data: NonNoneJSONType
input_data: Dict[str, NonNoneJSONType]
expected_output: JSONType
metadata: Dict[str, Any]
record_id: NotRequired[Optional[str]]
Expand Down Expand Up @@ -77,13 +90,13 @@ class Experiment:
def __init__(
self,
name: str,
task: Callable[[Dict[str, NonNoneJSONType]], JSONType],
task: Callable[[Dict[str, NonNoneJSONType], Optional[Dict[str, JSONType]]], JSONType],
dataset: Dataset,
evaluators: List[Callable[[NonNoneJSONType, JSONType, JSONType], JSONType]],
evaluators: List[Callable[[Dict[str, NonNoneJSONType], JSONType, JSONType], JSONType]],
project_name: str,
description: str = "",
tags: Optional[List[str]] = None,
config: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, JSONType]] = None,
_llmobs_instance: Optional["LLMObs"] = None,
) -> None:
self.name = name
Expand All @@ -92,7 +105,7 @@ def __init__(
self._evaluators = evaluators
self._description = description
self._tags = tags or []
self._config: Dict[str, Any] = config or {}
self._config: Dict[str, JSONType] = config or {}
self._llmobs_instance = _llmobs_instance

if not project_name:
Expand All @@ -107,12 +120,18 @@ def __init__(
self._id: Optional[str] = None
self._run_name: Optional[str] = None

def run(self, jobs: int = 1, raise_errors: bool = False, sample_size: Optional[int] = None) -> None:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
def run(self, jobs: int = 1, raise_errors: bool = False, sample_size: Optional[int] = None) -> List[Dict[str, Any]]:
if not self._llmobs_instance:
raise ValueError(
"LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)` "
"and create the experiment via `LLMObs.experiment(...)` before running the experiment."
)
if not self._llmobs_instance.enabled:
logger.warning(
"Skipping experiment as LLMObs is not enabled. "
"Ensure LLM Observability is enabled via `LLMObs.enable(...)` or set `DD_LLMOBS_ENABLED=1`."
)
return []
experiment_id, experiment_run_name = self._llmobs_instance._create_experiment(
name=self.name,
dataset_id=self._dataset._id,
Expand All @@ -125,11 +144,130 @@ def run(self, jobs: int = 1, raise_errors: bool = False, sample_size: Optional[i
self._id = experiment_id
self._run_name = experiment_run_name
task_results = self._run_task(jobs, raise_errors, sample_size)
self._run_evaluators(task_results, raise_errors=raise_errors)
return
evaluations = self._run_evaluators(task_results, raise_errors=raise_errors)
experiment_results = self._merge_results(task_results, evaluations)
return experiment_results

def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Dict[str, JSONType]:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
return {}
idx, record = idx_record
with self._llmobs_instance._experiment(name=self._task.__name__, experiment_id=self._id) as span:
span_context = self._llmobs_instance.export_span(span=span)
if span_context:
span_id = span_context.get("span_id", "")
trace_id = span_context.get("trace_id", "")
else:
span_id, trace_id = "", ""
input_data = record["input_data"]
record_id = record.get("record_id", "")
tags = {"dataset_id": self._dataset._id, "dataset_record_id": record_id, "experiment_id": self._id}
output_data = None
try:
output_data = self._task(input_data, self._config)
except Exception:
span.set_exc_info(*sys.exc_info())
self._llmobs_instance.annotate(span, input_data=input_data, output_data=output_data, tags=tags)
return {
"idx": idx,
"output": output_data,
"metadata": {
"timestamp": span.start_ns,
"duration": span.duration_ns,
"dataset_record_index": idx,
"experiment_name": self.name,
"dataset_name": self._dataset.name,
"span_id": span_id,
"trace_id": trace_id,
},
"error": {
"message": span.get_tag(ERROR_MSG),
"stack": span.get_tag(ERROR_STACK),
"type": span.get_tag(ERROR_TYPE),
},
}

def _run_task(
self, jobs: int, raise_errors: bool = False, sample_size: Optional[int] = None
) -> List[Dict[str, JSONType]]:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
return []
if sample_size is not None and sample_size < len(self._dataset):
subset_records = [deepcopy(record) for record in self._dataset._records[:sample_size]]
subset_name = "[Test subset of {} records] {}".format(sample_size, self._dataset.name)
subset_dataset = Dataset(
name=subset_name,
dataset_id=self._dataset._id,
records=subset_records,
description=self._dataset.description,
version=self._dataset._version,
)
else:
subset_dataset = self._dataset
task_results = []
with ThreadPoolExecutor(max_workers=jobs) as executor:
for result in executor.map(self._process_record, enumerate(subset_dataset)):
task_results.append(result)
err_dict = result.get("error") or {}
err_msg = err_dict.get("message") if isinstance(err_dict, dict) else None
if raise_errors and err_msg:
raise RuntimeError("Error on record {}: {}".format(result["idx"], err_msg))
self._llmobs_instance.flush() # Ensure spans get submitted in serverless environments
return task_results

def _run_evaluators(
self, task_results: List[Dict[str, JSONType]], raise_errors: bool = False
) -> List[Dict[str, JSONType]]:
if not task_results:
raise ValueError("No task results to evaluate.")
evaluations = []
for idx, task_result in enumerate(task_results):
output_data = task_result["output"]
record = cast(DatasetRecord, self._dataset[idx])
input_data = record["input_data"]
expected_output = record["expected_output"]
evals_dict = {}
for evaluator in self._evaluators:
eval_result, eval_err = None, None
try:
eval_result = evaluator(input_data, output_data, expected_output)
except Exception as e:
exc_type, exc_value, exc_tb = sys.exc_info()
exc_type_name = exc_type.__name__ if exc_type is not None else "Unknown Exception"
exc_stack = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
eval_err = {"message": str(exc_value), "type": exc_type_name, "stack": exc_stack}
if raise_errors:
raise RuntimeError(f"Evaluator {evaluator.__name__} failed on row {idx}")
evals_dict[evaluator.__name__] = {"value": eval_result, "error": eval_err}
evaluation = cast(Dict[str, JSONType], {"idx": idx, "evaluations": evals_dict})
evaluations.append(evaluation)
return evaluations

def _merge_results(self, task_results: List[Dict[str, JSONType]], evaluations: List[Dict[str, JSONType]]):
experiment_results = []
for idx, task_result in enumerate(task_results):
output_data = task_result["output"]
evals = evaluations[idx]["evaluations"]
record = cast(DatasetRecord, self._dataset[idx])

def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional[int] = None) -> List[Any]:
return []
err: Optional[JSONType] = None
metadata: Dict[str, JSONType] = {}
if isinstance(output_data, dict):
_metadata = output_data.get("metadata", {})
err = output_data.get("error")
if isinstance(_metadata, dict):
metadata = _metadata
metadata["tags"] = cast(List[JSONType], self._tags)

def _run_evaluators(self, task_results, raise_errors: bool = False) -> None:
pass
exp_result = {
"idx": idx,
"record_id": record.get("record_id", ""),
"input": record["input_data"],
"expected_output": record["expected_output"],
"output": output_data,
"evaluations": evals,
"metadata": metadata,
"error": err
}
experiment_results.append(exp_result)
return experiment_results
54 changes: 45 additions & 9 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import EXPERIMENT_EXPECTED_OUTPUT_KEY
from ddtrace.llmobs._constants import EXPERIMENT_ID_KEY
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PROMPT
Expand Down Expand Up @@ -238,6 +240,11 @@ def _llmobs_span_event(self, span: Span) -> LLMObsSpanEvent:
raise KeyError("Span kind not found in span context")

llmobs_span = LLMObsSpan()
_dd_attrs = {
"span_id": str(span.span_id),
"trace_id": format_trace_id(span.trace_id),
"apm_trace_id": format_trace_id(span.trace_id),
}

meta: Dict[str, Any] = {"span.kind": span_kind, "input": {}, "output": {}}
if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None:
Expand All @@ -258,6 +265,12 @@ def _llmobs_span_event(self, span: Span) -> LLMObsSpanEvent:
input_type = "messages"
llmobs_span.input = cast(List[LLMObsSpan.Message], enforce_message_role(input_messages))

if span.context.get_baggage_item(EXPERIMENT_ID_KEY):
expected_output = span._get_ctx_item(EXPERIMENT_EXPECTED_OUTPUT_KEY)
if span_kind == "experiment" and expected_output:
meta["expected_output"] = expected_output
_dd_attrs["scope"] = "experiments"

if span._get_ctx_item(OUTPUT_VALUE) is not None:
output_type = "value"
llmobs_span.output = [
Expand Down Expand Up @@ -346,11 +359,7 @@ def _llmobs_span_event(self, span: Span) -> LLMObsSpanEvent:
"meta": meta,
"metrics": metrics,
"tags": [],
"_dd": {
"span_id": str(span.span_id),
"trace_id": format_trace_id(span.trace_id),
"apm_trace_id": format_trace_id(span.trace_id),
},
"_dd": _dd_attrs,
}
session_id = _get_session_id(span)
if session_id is not None:
Expand Down Expand Up @@ -605,9 +614,9 @@ def _create_experiment(
def experiment(
cls,
name: str,
task: Callable[[Dict[str, NonNoneJSONType]], JSONType],
task: Callable[[Dict[str, NonNoneJSONType], Optional[Dict[str, JSONType]]], JSONType],
dataset: Dataset,
evaluators: List[Callable[[NonNoneJSONType, JSONType, JSONType], JSONType]],
evaluators: List[Callable[[Dict[str, NonNoneJSONType], JSONType, JSONType], JSONType]],
description: str = "",
project_name: Optional[str] = None,
tags: Optional[List[str]] = None,
Expand All @@ -629,8 +638,8 @@ def experiment(
raise TypeError("task must be a callable function.")
sig = inspect.signature(task)
params = sig.parameters
if "input_data" not in params:
raise TypeError("Task function must have an 'input_data' parameter.")
if "input_data" not in params or "config" not in params:
raise TypeError("Task function must have 'input_data' and 'config' parameters.")
if not isinstance(dataset, Dataset):
raise TypeError("Dataset must be an LLMObs Dataset object.")
if not evaluators or not all(callable(evaluator) for evaluator in evaluators):
Expand Down Expand Up @@ -1137,6 +1146,33 @@ def retrieval(
"retrieval", name=name, session_id=session_id, ml_app=ml_app, _decorator=_decorator
)

@classmethod
def _experiment(
cls,
name: Optional[str] = None,
session_id: Optional[str] = None,
ml_app: Optional[str] = None,
experiment_id: Optional[str] = None,
) -> Span:
"""
Trace an LLM experiment, only used internally by the experiments SDK.
:param str name: The name of the traced operation. If not provided, a default value of "agent" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value will be set to the value of `DD_LLMOBS_ML_APP`.
:param str experiment_id: The ID of the experiment to associate with this span and its children.
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
span = cls._instance._start_span("experiment", name=name, session_id=session_id, ml_app=ml_app)

# Set experiment_id in baggage if provided
if experiment_id:
span.context.set_baggage_item(EXPERIMENT_ID_KEY, experiment_id)

return span

@classmethod
def annotate(
cls,
Expand Down
10 changes: 8 additions & 2 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def dataset_create_with_records(self, name: str, description: str, records: List
def dataset_batch_update(self, dataset_id: str, records: List[DatasetRecord]) -> int:
rs: JSONType = [
{
"input": r["input_data"],
"input": cast(Dict[str, JSONType], r["input_data"]),
"expected_output": r["expected_output"],
"metadata": r.get("metadata", {}),
"record_id": r.get("record_id", None),
Expand Down Expand Up @@ -486,13 +486,19 @@ def enqueue(self, event: LLMObsSpanEvent) -> None:
truncated_event_size = len(safe_json(event))
telemetry.record_span_event_raw_size(event, raw_event_size)
telemetry.record_span_event_size(event, truncated_event_size or raw_event_size)
scope = event["_dd"].pop("scope", None)
if scope == "experiments":
self.periodic()
self._enqueue(event, truncated_event_size or raw_event_size)
self.periodic() # This only works for non-distributed cases though
self._enqueue(event, truncated_event_size or raw_event_size)

def _data(self, events: List[LLMObsSpanEvent]) -> List[Dict[str, Any]]:
return [
payload = [
{"_dd.stage": "raw", "_dd.tracer_version": ddtrace.__version__, "event_type": "span", "spans": [event]}
for event in events
]
return payload


def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent:
Expand Down
Loading
Loading