Skip to content

Commit a750944

Browse files
authored
fix(milvus): Add metrics support (#3013)
1 parent 8aaec30 commit a750944

File tree

7 files changed

+248
-30
lines changed

7 files changed

+248
-30
lines changed

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/__init__.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
from typing import Collection
77

88
from opentelemetry.instrumentation.milvus.config import Config
9+
from opentelemetry.metrics import get_meter
910
from opentelemetry.trace import get_tracer
1011
from wrapt import wrap_function_wrapper
1112

13+
from opentelemetry.semconv_ai import Meters
1214
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
1315
from opentelemetry.instrumentation.utils import unwrap
1416

1517
from opentelemetry.instrumentation.milvus.wrapper import _wrap
1618
from opentelemetry.instrumentation.milvus.version import __version__
19+
from opentelemetry.instrumentation.milvus.utils import is_metrics_enabled
1720

1821
logger = logging.getLogger(__name__)
1922

@@ -82,8 +85,39 @@ def instrumentation_dependencies(self) -> Collection[str]:
8285
return _instruments
8386

8487
def _instrument(self, **kwargs):
88+
if is_metrics_enabled():
89+
meter_provider = kwargs.get("meter_provider")
90+
meter = get_meter(__name__, __version__, meter_provider)
91+
92+
query_duration_metric = meter.create_histogram(
93+
Meters.DB_QUERY_DURATION,
94+
"s",
95+
"Duration of query operations",
96+
)
97+
distance_metric = meter.create_histogram(
98+
Meters.DB_SEARCH_DISTANCE,
99+
"",
100+
"Distance between search query vector and matched vectors",
101+
)
102+
insert_units_metric = meter.create_counter(
103+
Meters.DB_USAGE_INSERT_UNITS,
104+
"",
105+
"Number of insert units consumed in serverless calls",
106+
)
107+
upsert_units_metric = meter.create_counter(
108+
Meters.DB_USAGE_UPSERT_UNITS,
109+
"",
110+
"Number of upsert units consumed in serverless calls",
111+
)
112+
delete_units_metric = meter.create_counter(
113+
Meters.DB_USAGE_DELETE_UNITS,
114+
"",
115+
"Number of delete units consumed in serverless calls",
116+
)
117+
85118
tracer_provider = kwargs.get("tracer_provider")
86119
tracer = get_tracer(__name__, __version__, tracer_provider)
120+
87121
for wrapped_method in WRAPPED_METHODS:
88122
wrap_package = wrapped_method.get("package")
89123
wrap_object = wrapped_method.get("object")
@@ -92,7 +126,15 @@ def _instrument(self, **kwargs):
92126
wrap_function_wrapper(
93127
wrap_package,
94128
f"{wrap_object}.{wrap_method}",
95-
_wrap(tracer, wrapped_method),
129+
_wrap(
130+
tracer,
131+
query_duration_metric,
132+
distance_metric,
133+
insert_units_metric,
134+
upsert_units_metric,
135+
delete_units_metric,
136+
wrapped_method
137+
),
96138
)
97139

98140
def _uninstrument(self, **kwargs):

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import traceback
3+
import os
34
from opentelemetry.instrumentation.milvus.config import Config
45

56

@@ -26,3 +27,7 @@ def wrapper(*args, **kwargs):
2627
Config.exception_logger(e)
2728

2829
return wrapper
30+
31+
32+
def is_metrics_enabled() -> bool:
33+
return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true"

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/wrapper.py

Lines changed: 100 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from opentelemetry.instrumentation.milvus.utils import dont_throw
23
from opentelemetry.semconv.trace import SpanAttributes
34
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
@@ -23,9 +24,26 @@
2324
def _with_tracer_wrapper(func):
2425
"""Helper for providing tracer for wrapper functions."""
2526

26-
def _with_tracer(tracer, to_wrap):
27+
def _with_tracer(
28+
tracer,
29+
query_duration_metric,
30+
distance_metric,
31+
insert_units_metric,
32+
upsert_units_metric,
33+
delete_units_metric,
34+
to_wrap):
2735
def wrapper(wrapped, instance, args, kwargs):
28-
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
36+
return func(
37+
tracer,
38+
query_duration_metric,
39+
distance_metric,
40+
insert_units_metric,
41+
upsert_units_metric,
42+
delete_units_metric,
43+
to_wrap,
44+
wrapped,
45+
args,
46+
kwargs)
2947

3048
return wrapper
3149

@@ -40,50 +58,82 @@ def _set_span_attribute(span, name, value):
4058

4159

4260
@_with_tracer_wrapper
43-
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
61+
def _wrap(
62+
tracer,
63+
query_duration_metric,
64+
distance_metric,
65+
insert_units_metric,
66+
upsert_units_metric,
67+
delete_units_metric,
68+
to_wrap,
69+
wrapped,
70+
args,
71+
kwargs
72+
):
4473
"""Instruments and calls every function defined in TO_WRAP."""
4574
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
4675
return wrapped(*args, **kwargs)
4776

77+
method = to_wrap.get("method")
4878
name = to_wrap.get("span_name")
4979
with tracer.start_as_current_span(name) as span:
5080
span.set_attribute(SpanAttributes.DB_SYSTEM, "milvus")
5181
span.set_attribute(SpanAttributes.DB_OPERATION, to_wrap.get("method"))
5282

53-
if to_wrap.get("method") == "insert":
83+
if method == "insert":
5484
_set_insert_attributes(span, kwargs)
55-
elif to_wrap.get("method") == "upsert":
85+
elif method == "upsert":
5686
_set_upsert_attributes(span, kwargs)
57-
elif to_wrap.get("method") == "delete":
87+
elif method == "delete":
5888
_set_delete_attributes(span, kwargs)
59-
elif to_wrap.get("method") == "search":
89+
elif method == "search":
6090
_set_search_attributes(span, kwargs)
61-
elif to_wrap.get("method") == "get":
91+
elif method == "get":
6292
_set_get_attributes(span, kwargs)
63-
elif to_wrap.get("method") == "query":
93+
elif method == "query":
6494
_set_query_attributes(span, kwargs)
65-
elif to_wrap.get("method") == "create_collection":
95+
elif method == "create_collection":
6696
_set_create_collection_attributes(span, kwargs)
67-
elif to_wrap.get("method") == "hybrid_search":
97+
elif method == "hybrid_search":
6898
_set_hybrid_search_attributes(span, kwargs)
6999

70100
try:
101+
start_time = time.time()
71102
return_value = wrapped(*args, **kwargs)
72-
if to_wrap.get("method") == "query":
103+
end_time = time.time()
104+
if method == "query":
73105
_add_query_result_events(span, return_value)
74106

75-
if (
76-
to_wrap.get("method") == "search"
77-
or to_wrap.get("method") == "hybrid_search"
78-
):
107+
if method == "search" or method == "hybrid_search":
79108
_add_search_result_events(span, return_value)
109+
80110
except Exception as e:
81111
error_type = code_to_error_type.get(
82112
getattr(e, "code", None), type(e).__name__
83113
)
84114
span.set_attribute(ERROR_TYPE, error_type)
85115
raise
86116

117+
shared_attributes = {
118+
SpanAttributes.DB_SYSTEM: "milvus",
119+
SpanAttributes.DB_OPERATION: method,
120+
}
121+
duration = end_time - start_time
122+
if duration > 0 and query_duration_metric and method == "query":
123+
query_duration_metric.record(duration, shared_attributes)
124+
125+
if return_value:
126+
if method == "search" or method == "hybrid_search":
127+
set_search_response(distance_metric, shared_attributes, return_value)
128+
129+
_set_response_attributes(
130+
insert_units_metric,
131+
upsert_units_metric,
132+
delete_units_metric,
133+
shared_attributes,
134+
return_value,
135+
)
136+
87137
return return_value
88138

89139

@@ -118,6 +168,30 @@ def count_or_none(obj):
118168
return None
119169

120170

171+
@dont_throw
172+
def _set_response_attributes(
173+
insert_units_metric,
174+
upsert_units_metric,
175+
delete_units_metric,
176+
shared_attributes,
177+
response
178+
):
179+
if not isinstance(response, dict):
180+
return
181+
182+
if 'upsert_count' in response:
183+
upsert_count = response['upsert_count'] or 0
184+
upsert_units_metric.add(upsert_count, shared_attributes)
185+
186+
if 'insert_count' in response:
187+
insert_count = response['insert_count'] or 0
188+
insert_units_metric.add(insert_count, shared_attributes)
189+
190+
if 'delete_count' in response:
191+
delete_count = response['delete_count'] or 0
192+
delete_units_metric.add(delete_count, shared_attributes)
193+
194+
121195
@dont_throw
122196
def _set_create_collection_attributes(span, kwargs):
123197
_set_span_attribute(
@@ -443,3 +517,13 @@ def _set_delete_attributes(span, kwargs):
443517
AISpanAttributes.MILVUS_DELETE_FILTER,
444518
_encode_filter(kwargs.get("filter")),
445519
)
520+
521+
522+
@dont_throw
523+
def set_search_response(distance_metric, shared_attributes, response):
524+
for query_result in response:
525+
for match in query_result:
526+
distance = match.get("distance")
527+
528+
if distance_metric and distance is not None:
529+
distance_metric.record(distance, shared_attributes)

packages/opentelemetry-instrumentation-milvus/tests/conftest.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
"""Unit tests configuration module."""
22

33
import pytest
4-
from opentelemetry import trace
4+
from opentelemetry import trace, metrics
5+
from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider
6+
from opentelemetry.sdk.metrics.export import (
7+
AggregationTemporality,
8+
InMemoryMetricReader,
9+
)
10+
from opentelemetry.sdk.resources import Resource
511
from opentelemetry.sdk.trace import TracerProvider
612
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
713
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -19,11 +25,42 @@ def exporter():
1925
provider.add_span_processor(processor)
2026
trace.set_tracer_provider(provider)
2127

22-
MilvusInstrumentor().instrument()
23-
2428
return exporter
2529

2630

2731
@pytest.fixture(autouse=True)
2832
def clear_exporter(exporter):
2933
exporter.clear()
34+
35+
36+
@pytest.fixture(scope="session")
37+
def reader():
38+
reader = InMemoryMetricReader(
39+
{Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA}
40+
)
41+
return reader
42+
43+
44+
@pytest.fixture(scope="session")
45+
def meter_provider(reader):
46+
resource = Resource.create()
47+
meter_provider = MeterProvider(metric_readers=[reader], resource=resource)
48+
metrics.set_meter_provider(meter_provider)
49+
50+
return meter_provider
51+
52+
53+
@pytest.fixture(scope="session", autouse=True)
54+
def instrument(exporter, reader, meter_provider):
55+
MilvusInstrumentor().instrument()
56+
yield
57+
58+
exporter.shutdown()
59+
reader.shutdown()
60+
meter_provider.shutdown()
61+
62+
63+
@pytest.fixture(autouse=True)
64+
def clear_exporter_reader(exporter, reader):
65+
exporter.clear()
66+
reader.get_metrics_data()

0 commit comments

Comments
 (0)