Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def _instrument(self, **kwargs: Any) -> None:

self._original_pipeline_run = haystack.Pipeline.run
wrap_function_wrapper(
module="haystack.core.pipeline.pipeline",
name="Pipeline.run",
wrapper=_PipelineWrapper(tracer=self._tracer),
"haystack.core.pipeline.pipeline",
"Pipeline.run",
_PipelineWrapper(tracer=self._tracer),
)
from haystack.core.pipeline.pipeline import Pipeline

Expand All @@ -64,9 +64,9 @@ def wrap_component_run_method(
if component_cls not in self._original_component_run_methods:
self._original_component_run_methods[component_cls] = run_method
wrap_function_wrapper(
module=component_cls.__module__,
name=f"{component_cls.__name__}.run",
wrapper=_ComponentRunWrapper(tracer=self._tracer),
component_cls.__module__,
f"{component_cls.__name__}.run",
_ComponentRunWrapper(tracer=self._tracer),
)

wrap_function_wrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
if TYPE_CHECKING:
from haystack import Document, Pipeline
from haystack.core.component import Component
from openinference.instrumentation import OITracer


class _PipelineRunComponentWrapper:
Expand All @@ -60,7 +61,7 @@ def _run_component(

def __init__(
self,
tracer: trace_api.Tracer,
tracer: "trace_api.Tracer | OITracer",
wrap_component_run_method: Callable[[type[Any], Callable[..., Any]], None],
) -> None:
self._tracer = tracer
Expand All @@ -83,7 +84,7 @@ def __call__(


class _ComponentRunWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
def __init__(self, tracer: "trace_api.Tracer | OITracer") -> None:
self._tracer = tracer

def __call__(
Expand All @@ -105,7 +106,10 @@ def __call__(
name=_get_component_span_name(component_class_name)
) as span:
span.set_attributes(
{**dict(get_attributes_from_context()), **dict(_get_input_attributes(arguments))}
{
**dict(get_attributes_from_context()),
**dict(_get_input_attributes(arguments)),
}
)
if (component_type := _get_component_type(component)) is ComponentType.GENERATOR:
span.set_attributes(
Expand Down Expand Up @@ -163,7 +167,12 @@ def __call__(
}
)
elif component_type is ComponentType.EMBEDDER:
span.set_attributes(dict(_get_embedding_attributes(arguments, response)))
span.set_attributes(
{
**dict(_get_embedding_attributes(arguments, response)),
**dict(_get_embedding_token_count_attributes(response)),
}
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Model Name Overwritten by Metadata

The EMBEDDING_MODEL_NAME attribute is set twice for embedder components: once from the component's configuration and again from the response metadata. The response metadata value overwrites the component's configured value, potentially causing inconsistent model name reporting.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I configured response metadata as primary and component config as fallback! Is This acceptable?

elif component_type is ComponentType.RANKER:
span.set_attributes(dict(_get_reranker_response_attributes(response)))
elif component_type is ComponentType.RETRIEVER:
Expand All @@ -184,7 +193,7 @@ class _PipelineWrapper:
Captures all calls to the pipeline
"""

def __init__(self, tracer: trace_api.Tracer) -> None:
def __init__(self, tracer: "trace_api.Tracer | OITracer") -> None:
self._tracer = tracer

def __call__(
Expand Down Expand Up @@ -288,7 +297,9 @@ def _get_component_run_method(component: "Component") -> Optional[Callable[...,
return None


def _get_run_method_output_types(run_method: Callable[..., Any]) -> Optional[Dict[str, type]]:
def _get_run_method_output_types(
run_method: Callable[..., Any],
) -> Optional[Dict[str, type]]:
"""
Haystack components are decorated with an `output_type` decorator that is
useful for inferring the component type.
Expand All @@ -301,7 +312,9 @@ def _get_run_method_output_types(run_method: Callable[..., Any]) -> Optional[Dic
return None


def _get_run_method_input_types(run_method: Callable[..., Any]) -> Optional[Dict[str, type]]:
def _get_run_method_input_types(
run_method: Callable[..., Any],
) -> Optional[Dict[str, type]]:
"""
Gets input types of parameters to the `run` method.
"""
Expand Down Expand Up @@ -410,7 +423,9 @@ def _get_output_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str, A
yield OUTPUT_VALUE, safe_json_dumps(masked_response)


def _get_llm_input_message_attributes(arguments: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_llm_input_message_attributes(
arguments: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts input messages.
"""
Expand All @@ -431,7 +446,9 @@ def _get_llm_input_message_attributes(arguments: Mapping[str, Any]) -> Iterator[
yield f"{LLM_INPUT_MESSAGES}.0.{MESSAGE_ROLE}", USER


def _get_llm_output_message_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_llm_output_message_attributes(
response: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts output messages.
"""
Expand Down Expand Up @@ -489,7 +506,9 @@ def _get_llm_model_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str
yield LLM_MODEL_NAME, model


def _get_llm_token_count_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_llm_token_count_attributes(
response: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts token counts from response.
"""
Expand Down Expand Up @@ -519,6 +538,36 @@ def _get_llm_token_count_attributes(response: Mapping[str, Any]) -> Iterator[Tup
yield LLM_TOKEN_COUNT_TOTAL, total_tokens


def _get_embedding_token_count_attributes(
response: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts token counts and model from embedder response.
Supports both standard usage format and custom components that put usage in meta.
"""
token_usage = None
# Try to get usage from response root
if isinstance(usage := response.get("usage"), dict):
token_usage = usage
# Also check in meta for custom components that put usage there
elif isinstance(meta := response.get("meta"), dict):
if isinstance(usage := meta.get("usage"), dict):
token_usage = usage

# Extract model from meta for cost calculation
if isinstance(meta := response.get("meta"), dict):
if isinstance(model := meta.get("model"), str):
yield LLM_MODEL_NAME, model
yield EMBEDDING_MODEL_NAME, model

if token_usage is not None:
if (prompt_tokens := token_usage.get("prompt_tokens")) is not None:
yield LLM_TOKEN_COUNT_PROMPT, prompt_tokens
if (total_tokens := token_usage.get("total_tokens")) is not None:
yield LLM_TOKEN_COUNT_TOTAL, total_tokens
# Note: completion_tokens not tracked for embeddings (no text generation)


def _get_llm_prompt_template_attributes_from_prompt_builder(
component: "Component", run_bound_args: BoundArguments
) -> Iterator[Tuple[str, str]]:
Expand Down Expand Up @@ -572,7 +621,9 @@ def _get_reranker_model_attributes(component: "Component") -> Iterator[Tuple[str
yield RERANKER_MODEL_NAME, model_name


def _get_reranker_request_attributes(arguments: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_reranker_request_attributes(
arguments: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts re-ranker attributes from arguments.
"""
Expand All @@ -588,7 +639,9 @@ def _get_reranker_request_attributes(arguments: Mapping[str, Any]) -> Iterator[T
yield f"{RERANKER_INPUT_DOCUMENTS}.{doc_index}.{DOCUMENT_CONTENT}", content


def _get_reranker_response_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_reranker_response_attributes(
response: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts re-ranker attributes from response.
"""
Expand All @@ -602,7 +655,9 @@ def _get_reranker_response_attributes(response: Mapping[str, Any]) -> Iterator[T
yield f"{RERANKER_OUTPUT_DOCUMENTS}.{doc_index}.{DOCUMENT_SCORE}", score


def _get_retriever_response_attributes(response: Mapping[str, Any]) -> Iterator[Tuple[str, Any]]:
def _get_retriever_response_attributes(
response: Mapping[str, Any],
) -> Iterator[Tuple[str, Any]]:
"""
Extracts retriever-related attributes from the response.
"""
Expand All @@ -628,7 +683,9 @@ def _get_retriever_response_attributes(response: Mapping[str, Any]) -> Iterator[
)


def _get_embedding_model_attributes(component: "Component") -> Iterator[Tuple[str, Any]]:
def _get_embedding_model_attributes(
component: "Component",
) -> Iterator[Tuple[str, Any]]:
"""
Yields attributes for embedding model.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ def test_openai_document_embedder_embedding_span_has_expected_attributes(
assert "Argentina won the World Cup in 2022." in output_documents[0]
assert "France won the World Cup in 2018." in output_documents[1]
assert attributes.pop(EMBEDDING_MODEL_NAME) == "text-embedding-3-small"
assert attributes.pop(LLM_MODEL_NAME) == "text-embedding-3-small"
assert (
attributes.pop(f"{EMBEDDING_EMBEDDINGS}.0.{EMBEDDING_TEXT}")
== "Argentina won the World Cup in 2022."
Expand All @@ -748,6 +749,10 @@ def test_openai_document_embedder_embedding_span_has_expected_attributes(
== "France won the World Cup in 2018."
)
assert _is_vector(attributes.pop(f"{EMBEDDING_EMBEDDINGS}.1.{EMBEDDING_VECTOR}"))
assert isinstance(prompt_tokens := attributes.pop(LLM_TOKEN_COUNT_PROMPT), int)
assert prompt_tokens == 20
assert isinstance(total_tokens := attributes.pop(LLM_TOKEN_COUNT_TOTAL), int)
assert total_tokens == 20
assert not attributes


Expand Down