Skip to content

Commit 50f602e

Browse files
committed
Make DatasetRecords input_data type more strict, add tests
1 parent a8db6bd commit 50f602e

11 files changed

+399
-24
lines changed

ddtrace/llmobs/_constants.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,3 @@
9696
PROXY_REQUEST = "llmobs.proxy_request"
9797

9898
EXPERIMENT_ID_KEY = "_ml_obs.experiment_id"
99-
EXPERIMENT_EXPECTED_OUTPUT_KEY = "_ml_obs.meta.input.expected_output"

ddtrace/llmobs/_experiment.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from typing import TYPE_CHECKING
21
from concurrent.futures import ThreadPoolExecutor
32
from copy import deepcopy
43
import sys
54
import time
6-
import traceback
5+
from typing import TYPE_CHECKING
76
from typing import Any
87
from typing import Callable
98
from typing import Dict
@@ -19,7 +18,6 @@
1918
from ddtrace.constants import ERROR_MSG
2019
from ddtrace.constants import ERROR_STACK
2120
from ddtrace.constants import ERROR_TYPE
22-
from ddtrace.llmobs._constants import EXPERIMENT_EXPECTED_OUTPUT_KEY
2321

2422

2523
if TYPE_CHECKING:
@@ -29,12 +27,10 @@
2927

3028
JSONType = Union[str, int, float, bool, None, List["JSONType"], Dict[str, "JSONType"]]
3129
NonNoneJSONType = Union[str, int, float, bool, List[JSONType], Dict[str, JSONType]]
32-
API_PROCESSING_TIME_SLEEP = 6 # median events processor processing time in seconds
33-
FLUSH_EVERY = 10 # default number of records to process before flushing
3430

3531

3632
class DatasetRecord(TypedDict):
37-
input_data: NonNoneJSONType
33+
input_data: Dict[str, NonNoneJSONType]
3834
expected_output: JSONType
3935
metadata: Dict[str, Any]
4036
record_id: NotRequired[Optional[str]]
@@ -96,7 +92,7 @@ def __init__(
9692
project_name: str,
9793
description: str = "",
9894
tags: Optional[List[str]] = None,
99-
config: Optional[Dict[str, Any]] = None,
95+
config: Optional[Dict[str, JSONType]] = None,
10096
_llmobs_instance: Optional["LLMObs"] = None,
10197
) -> None:
10298
self.name = name
@@ -105,7 +101,7 @@ def __init__(
105101
self._evaluators = evaluators
106102
self._description = description
107103
self._tags = tags or []
108-
self._config: Dict[str, Any] = config or {}
104+
self._config: Dict[str, JSONType] = config or {}
109105
self._llmobs_instance = _llmobs_instance
110106

111107
if not project_name:
@@ -141,24 +137,28 @@ def run(self, jobs: int = 1, raise_errors: bool = False, sample_size: Optional[i
141137
self._run_evaluators(task_results, raise_errors=raise_errors)
142138
return
143139

144-
def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Dict[str, Any]:
140+
def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Dict[str, JSONType]:
141+
if not self._llmobs_instance or not self._llmobs_instance.enabled:
142+
return {}
145143
idx, record = idx_record
146144
start_ns = time.time_ns()
147145
with self._llmobs_instance._experiment(name=self._task.__name__, experiment_id=self._id) as span:
148146
span_context = self._llmobs_instance.export_span(span=span)
149-
span_id = span_context.get("span_id", "")
150-
trace_id = span_context.get("trace_id", "")
147+
if span_context:
148+
span_id = span_context.get("span_id", "")
149+
trace_id = span_context.get("trace_id", "")
150+
else:
151+
span_id, trace_id = "", ""
151152
input_data = record["input_data"]
152153
record_id = record.get("record_id", "")
153154
expected_output = record["expected_output"]
154155
tags = {"dataset_id": self._dataset._id, "dataset_record_id": record_id, "experiment_id": self._id}
155156
output_data = None
156157
try:
157-
output_data = self._task(input_data) # FIXME: support config?
158+
output_data = self._task(input_data, self._config)
158159
except Exception:
159160
span.set_exc_info(*sys.exc_info())
160161
self._llmobs_instance.annotate(span, input_data=input_data, output_data=output_data, tags=tags)
161-
span._set_ctx_item(EXPERIMENT_EXPECTED_OUTPUT_KEY, expected_output) # FIXME: should we be doing this here?
162162
return {
163163
"idx": idx,
164164
"output": output_data,
@@ -178,11 +178,21 @@ def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Dict[str, An
178178
},
179179
}
180180

181-
def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional[int] = None) -> List[Any]:
181+
def _run_task(
182+
self, jobs: int, raise_errors: bool = False, sample_size: Optional[int] = None
183+
) -> List[Dict[str, JSONType]]:
184+
if not self._llmobs_instance or not self._llmobs_instance.enabled:
185+
return []
182186
if sample_size is not None and sample_size < len(self._dataset):
183-
subset_data = [deepcopy(record) for record in self._dataset._data[:sample_size]]
187+
subset_records = [deepcopy(record) for record in self._dataset._records[:sample_size]]
184188
subset_name = "[Test subset of {} records] {}".format(sample_size, self._dataset.name)
185-
subset_dataset = Dataset(name=subset_name, dataset_id=self._dataset._id, data=subset_data)
189+
subset_dataset = Dataset(
190+
name=subset_name,
191+
dataset_id=self._dataset._id,
192+
records=subset_records,
193+
description=self._dataset.description,
194+
version=self._dataset._version,
195+
)
186196
else:
187197
subset_dataset = self._dataset
188198
task_results = []
@@ -191,8 +201,7 @@ def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional
191201
task_results.append(result)
192202
if raise_errors and result["error"]["message"]:
193203
raise RuntimeError("Error on record {}: {}".format(result["idx"], result["error"]["message"]))
194-
self._llmobs_instance.flush()
195-
time.sleep(API_PROCESSING_TIME_SLEEP)
204+
self._llmobs_instance.flush() # Ensure spans get submitted in serverless environments
196205
return task_results
197206

198207
def _run_evaluators(self, task_results, raise_errors: bool = False) -> None:

ddtrace/llmobs/_llmobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -630,8 +630,8 @@ def experiment(
630630
raise TypeError("task must be a callable function.")
631631
sig = inspect.signature(task)
632632
params = sig.parameters
633-
if "input_data" not in params:
634-
raise TypeError("Task function must have an 'input_data' parameter.")
633+
if "input_data" not in params or "config" not in params:
634+
raise TypeError("Task function must have 'input_data' and 'config' parameters.")
635635
if not isinstance(dataset, Dataset):
636636
raise TypeError("Dataset must be an LLMObs Dataset object.")
637637
if not evaluators or not all(callable(evaluator) for evaluator in evaluators):

ddtrace/llmobs/_writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ def dataset_create_with_records(self, name: str, description: str, records: List
346346
def dataset_batch_update(self, dataset_id: str, records: List[DatasetRecord]) -> int:
347347
rs: JSONType = [
348348
{
349-
"input": r["input_data"],
349+
"input": cast(Dict[str, JSONType], r["input_data"]),
350350
"expected_output": r["expected_output"],
351351
"metadata": r.get("metadata", {}),
352352
"record_id": r.get("record_id", None),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
interactions:
2+
- request:
3+
body: '{"data": {"type": "datasets", "attributes": {"insert_records": [{"input":
4+
{"prompt": "What is the capital of France?"}, "expected_output": {"answer":
5+
"Paris"}, "metadata": {}, "record_id": null}, {"input": {"prompt": "What is
6+
the capital of Canada?"}, "expected_output": {"answer": "Ottawa"}, "metadata":
7+
{}, "record_id": null}]}}}'
8+
headers:
9+
Accept:
10+
- '*/*'
11+
? !!python/object/apply:multidict._multidict.istr
12+
- Accept-Encoding
13+
: - identity
14+
Connection:
15+
- keep-alive
16+
Content-Length:
17+
- '331'
18+
? !!python/object/apply:multidict._multidict.istr
19+
- Content-Type
20+
: - application/json
21+
User-Agent:
22+
- python-requests/2.32.4
23+
method: POST
24+
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/3bf4897d-e6aa-43a3-8d9c-5097b1f85177/batch_update
25+
response:
26+
body:
27+
string: '{"data":[{"id":"3eda96b0-5590-4886-8633-34154e381dc3","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:18:56.69063271Z","dataset_id":"3bf4897d-e6aa-43a3-8d9c-5097b1f85177","expected_output":{"answer":"Paris"},"input":{"prompt":"What
28+
is the capital of France?"},"metadata":{},"updated_at":"2025-07-14T23:18:56.69063271Z","version":1}},{"id":"54fd2188-bdae-47d5-bf76-4c5d8c9fba9f","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:18:56.69063271Z","dataset_id":"3bf4897d-e6aa-43a3-8d9c-5097b1f85177","expected_output":{"answer":"Ottawa"},"input":{"prompt":"What
29+
is the capital of Canada?"},"metadata":{},"updated_at":"2025-07-14T23:18:56.69063271Z","version":1}}]}'
30+
headers:
31+
content-length:
32+
- '793'
33+
content-security-policy:
34+
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
35+
content-type:
36+
- application/vnd.api+json
37+
date:
38+
- Mon, 14 Jul 2025 23:18:56 GMT
39+
strict-transport-security:
40+
- max-age=31536000; includeSubDomains; preload
41+
vary:
42+
- Accept-Encoding
43+
x-content-type-options:
44+
- nosniff
45+
x-frame-options:
46+
- SAMEORIGIN
47+
status:
48+
code: 200
49+
message: OK
50+
version: 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
interactions:
2+
- request:
3+
body: '{"data": {"type": "datasets", "attributes": {"insert_records": [{"input":
4+
{"prompt": "What is the capital of France?"}, "expected_output": {"answer":
5+
"Paris"}, "metadata": {}, "record_id": null}]}}}'
6+
headers:
7+
Accept:
8+
- '*/*'
9+
? !!python/object/apply:multidict._multidict.istr
10+
- Accept-Encoding
11+
: - identity
12+
Connection:
13+
- keep-alive
14+
Content-Length:
15+
- '198'
16+
? !!python/object/apply:multidict._multidict.istr
17+
- Content-Type
18+
: - application/json
19+
User-Agent:
20+
- python-requests/2.32.4
21+
method: POST
22+
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/67c7b6cc-ce98-481e-ab9b-e4925564826c/batch_update
23+
response:
24+
body:
25+
string: '{"data":[{"id":"ddffedb3-cfa9-459c-80d1-cdfcb7062ec9","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:19:03.665305678Z","dataset_id":"67c7b6cc-ce98-481e-ab9b-e4925564826c","expected_output":{"answer":"Paris"},"input":{"prompt":"What
26+
is the capital of France?"},"metadata":{},"updated_at":"2025-07-14T23:19:03.665305678Z","version":1}}]}'
27+
headers:
28+
content-length:
29+
- '403'
30+
content-security-policy:
31+
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
32+
content-type:
33+
- application/vnd.api+json
34+
date:
35+
- Mon, 14 Jul 2025 23:19:03 GMT
36+
strict-transport-security:
37+
- max-age=31536000; includeSubDomains; preload
38+
vary:
39+
- Accept-Encoding
40+
x-content-type-options:
41+
- nosniff
42+
x-frame-options:
43+
- SAMEORIGIN
44+
status:
45+
code: 200
46+
message: OK
47+
version: 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
interactions:
2+
- request:
3+
body: '{"data": {"type": "datasets", "attributes": {"type": "soft", "dataset_ids":
4+
["67c7b6cc-ce98-481e-ab9b-e4925564826c"]}}}'
5+
headers:
6+
Accept:
7+
- '*/*'
8+
? !!python/object/apply:multidict._multidict.istr
9+
- Accept-Encoding
10+
: - identity
11+
Connection:
12+
- keep-alive
13+
Content-Length:
14+
- '119'
15+
? !!python/object/apply:multidict._multidict.istr
16+
- Content-Type
17+
: - application/json
18+
User-Agent:
19+
- python-requests/2.32.4
20+
method: POST
21+
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/delete
22+
response:
23+
body:
24+
string: '{"data":[{"id":"67c7b6cc-ce98-481e-ab9b-e4925564826c","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:19:03.530317Z","current_version":1,"deleted_at":"2025-07-14T23:19:09.996669Z","description":"A
25+
test dataset","name":"test-dataset-test_experiment_run_task_error[test_dataset_records0]","updated_at":"2025-07-14T23:19:03.816944Z"}}]}'
26+
headers:
27+
content-length:
28+
- '400'
29+
content-security-policy:
30+
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
31+
content-type:
32+
- application/vnd.api+json
33+
date:
34+
- Mon, 14 Jul 2025 23:19:10 GMT
35+
strict-transport-security:
36+
- max-age=31536000; includeSubDomains; preload
37+
vary:
38+
- Accept-Encoding
39+
x-content-type-options:
40+
- nosniff
41+
x-frame-options:
42+
- SAMEORIGIN
43+
status:
44+
code: 200
45+
message: OK
46+
version: 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
interactions:
2+
- request:
3+
body: '{"data": {"type": "datasets", "attributes": {"type": "soft", "dataset_ids":
4+
["3bf4897d-e6aa-43a3-8d9c-5097b1f85177"]}}}'
5+
headers:
6+
Accept:
7+
- '*/*'
8+
? !!python/object/apply:multidict._multidict.istr
9+
- Accept-Encoding
10+
: - identity
11+
Connection:
12+
- keep-alive
13+
Content-Length:
14+
- '119'
15+
? !!python/object/apply:multidict._multidict.istr
16+
- Content-Type
17+
: - application/json
18+
User-Agent:
19+
- python-requests/2.32.4
20+
method: POST
21+
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets/delete
22+
response:
23+
body:
24+
string: '{"data":[{"id":"3bf4897d-e6aa-43a3-8d9c-5097b1f85177","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:18:56.592831Z","current_version":1,"deleted_at":"2025-07-14T23:19:03.156151Z","description":"A
25+
test dataset","name":"test-dataset-test_experiment_run_task[test_dataset_records0]","updated_at":"2025-07-14T23:18:56.822333Z"}}]}'
26+
headers:
27+
content-length:
28+
- '394'
29+
content-security-policy:
30+
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
31+
content-type:
32+
- application/vnd.api+json
33+
date:
34+
- Mon, 14 Jul 2025 23:19:03 GMT
35+
strict-transport-security:
36+
- max-age=31536000; includeSubDomains; preload
37+
vary:
38+
- Accept-Encoding
39+
x-content-type-options:
40+
- nosniff
41+
x-frame-options:
42+
- SAMEORIGIN
43+
status:
44+
code: 200
45+
message: OK
46+
version: 1
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
interactions:
2+
- request:
3+
body: '{"data": {"type": "datasets", "attributes": {"name": "test-dataset-test_experiment_run_task[test_dataset_records0]",
4+
"description": "A test dataset"}}}'
5+
headers:
6+
Accept:
7+
- '*/*'
8+
? !!python/object/apply:multidict._multidict.istr
9+
- Accept-Encoding
10+
: - identity
11+
Connection:
12+
- keep-alive
13+
Content-Length:
14+
- '151'
15+
? !!python/object/apply:multidict._multidict.istr
16+
- Content-Type
17+
: - application/json
18+
User-Agent:
19+
- python-requests/2.32.4
20+
method: POST
21+
uri: https://api.datadoghq.com/api/unstable/llm-obs/v1/datasets
22+
response:
23+
body:
24+
string: '{"data":{"id":"3bf4897d-e6aa-43a3-8d9c-5097b1f85177","type":"datasets","attributes":{"author":{"id":"df7d11c9-da50-11ed-af19-2e9f609a4672"},"created_at":"2025-07-14T23:18:56.592831479Z","current_version":0,"description":"A
25+
test dataset","name":"test-dataset-test_experiment_run_task[test_dataset_records0]","updated_at":"2025-07-14T23:18:56.592831479Z"}}}'
26+
headers:
27+
content-length:
28+
- '355'
29+
content-security-policy:
30+
- frame-ancestors 'self'; report-uri https://logs.browser-intake-datadoghq.com/api/v2/logs?dd-api-key=pube4f163c23bbf91c16b8f57f56af9fc58&dd-evp-origin=content-security-policy&ddsource=csp-report&ddtags=site%3Adatadoghq.com
31+
content-type:
32+
- application/vnd.api+json
33+
date:
34+
- Mon, 14 Jul 2025 23:18:56 GMT
35+
strict-transport-security:
36+
- max-age=31536000; includeSubDomains; preload
37+
vary:
38+
- Accept-Encoding
39+
x-content-type-options:
40+
- nosniff
41+
x-frame-options:
42+
- SAMEORIGIN
43+
status:
44+
code: 200
45+
message: OK
46+
version: 1

0 commit comments

Comments
 (0)