Skip to content

chore(llmobs): dac strip io from OpenAI #13791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
74f4c4b
remove io from open ai integration
jsimpher Jun 26, 2025
1f3327d
add release note
jsimpher Jun 26, 2025
dae8d4a
remove some parameters and metrics
jsimpher Jun 26, 2025
4fe6df8
remove sampling/truncation tests for removed fields
jsimpher Jun 26, 2025
7d864a4
remove sampling/truncation tests for removed fields
jsimpher Jun 26, 2025
6b6e51d
a few more kwargs gone
jsimpher Jun 26, 2025
dce8767
one more update
jsimpher Jun 26, 2025
b24fdf2
update snapshots
jsimpher Jun 26, 2025
e91cd0e
removed another test
jsimpher Jun 26, 2025
225115c
updated snapshots
jsimpher Jun 26, 2025
06e2b01
ruff
jsimpher Jun 26, 2025
6797069
feedback
jsimpher Jul 10, 2025
06b1be0
feedback
jsimpher Jul 10, 2025
bfedc74
readd user param
jsimpher Jul 10, 2025
37fd773
Update releasenotes/notes/remove-io-data-from-apm-span-openai-integra…
jsimpher Jul 11, 2025
9b3788e
move token metrics to llmobs
jsimpher Jul 11, 2025
9f9b27d
request arg params
jsimpher Jul 11, 2025
bc14f4e
black ruff
jsimpher Jul 11, 2025
50ba313
resolve conflicts
jsimpher Jul 11, 2025
5209989
remove redundant .usage check
jsimpher Jul 14, 2025
368c969
black
jsimpher Jul 14, 2025
8f0b57d
remove import
jsimpher Jul 14, 2025
5af8477
removed some more stuff
jsimpher Jul 14, 2025
0ad531a
ruff
jsimpher Jul 14, 2025
ae0269f
Update releasenotes/notes/remove-io-data-from-apm-span-openai-integra…
jsimpher Jul 15, 2025
72ff4e7
stream token stuff only if no usage found, even on workflows
jsimpher Jul 15, 2025
94ab642
move workflow check to top of metric extraction
jsimpher Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 11 additions & 122 deletions ddtrace/contrib/internal/openai/_endpoint_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

from ddtrace.contrib.internal.openai.utils import TracedOpenAIAsyncStream
from ddtrace.contrib.internal.openai.utils import TracedOpenAIStream
from ddtrace.contrib.internal.openai.utils import _format_openai_api_key
from ddtrace.contrib.internal.openai.utils import _is_async_generator
from ddtrace.contrib.internal.openai.utils import _is_generator
from ddtrace.contrib.internal.openai.utils import _loop_handler
from ddtrace.contrib.internal.openai.utils import _process_finished_stream
from ddtrace.contrib.internal.openai.utils import _tag_tool_calls
from ddtrace.internal.utils.version import parse_version


Expand Down Expand Up @@ -57,22 +55,15 @@ def _record_request(self, pin, integration, instance, span, args, kwargs):
continue
if arg in self._base_level_tag_args:
span.set_tag_str("openai.%s" % arg, str(args[idx]))
elif arg == "organization":
span.set_tag_str("openai.organization.id", args[idx])
elif arg == "api_key":
span.set_tag_str("openai.user.api_key", _format_openai_api_key(args[idx]))
else:
span.set_tag_str("openai.request.%s" % arg, str(args[idx]))
for kw_attr in self._request_kwarg_params:
if kw_attr not in kwargs:
continue

if isinstance(kwargs[kw_attr], dict):
for k, v in kwargs[kw_attr].items():
span.set_tag_str("openai.request.%s.%s" % (kw_attr, k), str(v))
elif kw_attr == "engine": # Azure OpenAI requires using "engine" instead of "model"
elif kw_attr == "engine" or kw_attr == "model": # Azure OpenAI requires using "engine" instead of "model"
span.set_tag_str("openai.request.model", str(kwargs[kw_attr]))
else:
span.set_tag_str("openai.request.%s" % kw_attr, str(kwargs[kw_attr]))

def handle_request(self, pin, integration, instance, span, args, kwargs):
self._record_request(pin, integration, instance, span, args, kwargs)
Expand All @@ -92,7 +83,7 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):


class _BaseCompletionHook(_EndpointHook):
_request_arg_params = ("api_key", "api_base", "api_type", "request_id", "api_version", "organization")
_request_arg_params = ()

def _handle_streamed_response(self, integration, span, kwargs, resp, operation_type=""):
"""Handle streamed response objects returned from completions/chat/response endpoint calls.
Expand Down Expand Up @@ -166,34 +157,12 @@ class _CompletionHook(_BaseCompletionHook):
"model",
"engine",
"suffix",
"max_tokens",
"temperature",
"top_p",
"n",
"stream",
"logprobs",
"echo",
"stop",
"presence_penalty",
"frequency_penalty",
"best_of",
"logit_bias",
"user",
)
_response_attrs = ("created", "id", "model")
_response_attrs = ("model",)
ENDPOINT_NAME = "completions"
HTTP_METHOD_TYPE = "POST"
OPERATION_ID = "createCompletion"

def _record_request(self, pin, integration, instance, span, args, kwargs):
super()._record_request(pin, integration, instance, span, args, kwargs)
if integration.is_pc_sampled_span(span):
prompt = kwargs.get("prompt", "")
if isinstance(prompt, str):
prompt = [prompt]
for idx, p in enumerate(prompt):
span.set_tag_str("openai.request.prompt.%d" % idx, integration.trunc(str(p)))

def _record_response(self, pin, integration, span, args, kwargs, resp, error):
resp = super()._record_response(pin, integration, span, args, kwargs, resp, error)
if not resp:
Expand All @@ -202,13 +171,6 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
if kwargs.get("stream") and error is None:
return self._handle_streamed_response(integration, span, kwargs, resp, operation_type="completion")
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="completion")
if not resp:
return
for choice in resp.choices:
span.set_tag_str("openai.response.choices.%d.finish_reason" % choice.index, str(choice.finish_reason))
if integration.is_pc_sampled_span(span):
span.set_tag_str("openai.response.choices.%d.text" % choice.index, integration.trunc(choice.text))
integration.record_usage(span, resp.usage)
return resp


Expand All @@ -217,40 +179,18 @@ class _CompletionWithRawResponseHook(_CompletionHook):


class _ChatCompletionHook(_BaseCompletionHook):
_request_arg_params = ("api_key", "api_base", "api_type", "request_id", "api_version", "organization")
_request_arg_params = ()
_request_kwarg_params = (
"model",
"engine",
"temperature",
"top_p",
"n",
"stream",
"stop",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"logit_bias",
"user",
)
_response_attrs = ("created", "id", "model")
_response_attrs = ("model",)
ENDPOINT_NAME = "chat/completions"
HTTP_METHOD_TYPE = "POST"
OPERATION_ID = "createChatCompletion"

def _record_request(self, pin, integration, instance, span, args, kwargs):
super()._record_request(pin, integration, instance, span, args, kwargs)
for idx, m in enumerate(kwargs.get("messages", [])):
role = getattr(m, "role", "")
name = getattr(m, "name", "")
content = getattr(m, "content", "")
if isinstance(m, dict):
content = m.get("content", "")
role = m.get("role", "")
name = m.get("name", "")
if integration.is_pc_sampled_span(span):
span.set_tag_str("openai.request.messages.%d.content" % idx, integration.trunc(str(content)))
span.set_tag_str("openai.request.messages.%d.role" % idx, str(role))
span.set_tag_str("openai.request.messages.%d.name" % idx, str(name))
if parse_version(OPENAI_VERSION) >= (1, 26) and kwargs.get("stream"):
stream_options = kwargs.get("stream_options", {})
if not isinstance(stream_options, dict):
Expand All @@ -270,21 +210,6 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
if kwargs.get("stream") and error is None:
return self._handle_streamed_response(integration, span, kwargs, resp, operation_type="chat")
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="chat")
for choice in resp.choices:
idx = choice.index
finish_reason = getattr(choice, "finish_reason", None)
message = choice.message
span.set_tag_str("openai.response.choices.%d.finish_reason" % idx, str(finish_reason))
span.set_tag_str("openai.response.choices.%d.message.role" % idx, choice.message.role)
if integration.is_pc_sampled_span(span):
span.set_tag_str(
"openai.response.choices.%d.message.content" % idx, integration.trunc(message.content or "")
)
if getattr(message, "function_call", None):
_tag_tool_calls(integration, span, [message.function_call], idx)
if getattr(message, "tool_calls", None):
_tag_tool_calls(integration, span, message.tool_calls, idx)
integration.record_usage(span, resp.usage)
return resp


Expand All @@ -293,34 +218,18 @@ class _ChatCompletionWithRawResponseHook(_ChatCompletionHook):


class _EmbeddingHook(_EndpointHook):
_request_arg_params = ("api_key", "api_base", "api_type", "request_id", "api_version", "organization")
_request_kwarg_params = ("model", "engine", "user")
_request_arg_params = ()
_request_kwarg_params = ("model", "engine")
_response_attrs = ("model",)
ENDPOINT_NAME = "embeddings"
HTTP_METHOD_TYPE = "POST"
OPERATION_ID = "createEmbedding"

def _record_request(self, pin, integration, instance, span, args, kwargs):
"""
Embedding endpoint allows multiple inputs, each of which we specify a request tag for, so have to
manually set them in _pre_response().
"""
super()._record_request(pin, integration, instance, span, args, kwargs)
embedding_input = kwargs.get("input", "")
if integration.is_pc_sampled_span(span):
if isinstance(embedding_input, str) or isinstance(embedding_input[0], int):
embedding_input = [embedding_input]
for idx, inp in enumerate(embedding_input):
span.set_tag_str("openai.request.input.%d" % idx, integration.trunc(str(inp)))

def _record_response(self, pin, integration, span, args, kwargs, resp, error):
resp = super()._record_response(pin, integration, span, args, kwargs, resp, error)
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="embedding")
if not resp:
return
span.set_metric("openai.response.embeddings_count", len(resp.data))
span.set_metric("openai.response.embedding-length", len(resp.data[0].embedding))
integration.record_usage(span, resp.usage)
return resp


Expand All @@ -329,7 +238,7 @@ class _ListHook(_EndpointHook):
Hook for openai.ListableAPIResource, which is used by Model.list, File.list, and FineTune.list.
"""

_request_arg_params = ("api_key", "request_id", "api_version", "organization", "api_base", "api_type")
_request_arg_params = ("api_base", "api_version")
_request_kwarg_params = ("user",)
ENDPOINT_NAME = None
HTTP_METHOD_TYPE = "GET"
Expand Down Expand Up @@ -372,7 +281,7 @@ class _FileListHook(_ListHook):
class _RetrieveHook(_EndpointHook):
"""Hook for openai.APIResource, which is used by Model.retrieve, File.retrieve, and FineTune.retrieve."""

_request_arg_params = (None, "api_key", "request_id", "request_timeout")
_request_arg_params = (None, "request_id", "request_timeout")
_request_kwarg_params = ("user",)
_response_attrs = (
"id",
Expand Down Expand Up @@ -726,26 +635,7 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
class _ResponseHook(_BaseCompletionHook):
_request_arg_params = ()
# Collecting all kwargs for responses
_request_kwarg_params = (
"model",
"include",
"instructions",
"max_output_tokens",
"metadata",
"parallel_tool_calls",
"previous_response_id",
"reasoning",
"service_tier",
"store",
"stream",
"temperature",
"text",
"tool_choice",
"tools",
"top_p",
"truncation",
"user",
)
_request_kwarg_params = ("model",)
_response_attrs = ("model",)
ENDPOINT_NAME = "responses"
HTTP_METHOD_TYPE = "POST"
Expand All @@ -759,5 +649,4 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
if kwargs.get("stream") and error is None:
return self._handle_streamed_response(integration, span, kwargs, resp, operation_type="response")
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="response")
integration.record_usage(span, resp.usage)
return resp
5 changes: 0 additions & 5 deletions ddtrace/contrib/internal/openai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from ddtrace import config
from ddtrace.contrib.internal.openai import _endpoint_hooks
from ddtrace.contrib.internal.openai.utils import _format_openai_api_key
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.contrib.trace_utils import wrap
Expand Down Expand Up @@ -220,11 +219,7 @@ def patched_completions_with_raw_response_init(openai, pin, func, instance, args

def _traced_endpoint(endpoint_hook, integration, instance, pin, args, kwargs):
span = integration.trace(pin, endpoint_hook.OPERATION_ID, instance=instance)
openai_api_key = _format_openai_api_key(kwargs.get("api_key"))
resp, err = None, None
if openai_api_key:
# API key can either be set on the import or per request
span.set_tag_str("openai.user.api_key", openai_api_key)
try:
# Start the hook
hook = endpoint_hook().handle_request(pin, integration, instance, span, args, kwargs)
Expand Down
Loading
Loading