Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw an error when multiple instruments are registered by the same name #1438

Merged
merged 22 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e564154
Throw an error when multiple instruments are registered by same name
srikanthccv Nov 30, 2020
2ceca73
Add tests for same name instrumentation exception
srikanthccv Nov 30, 2020
6a4d6d6
Remove observer name from set when unregistered
srikanthccv Dec 1, 2020
9e7af7c
Fix opentelemetry-instrumentation tests
srikanthccv Dec 1, 2020
a8b76aa
Fix opencensus exporter tests
srikanthccv Dec 1, 2020
ecb773e
Add test for unregistering and re-registering
srikanthccv Dec 1, 2020
a584e55
Update opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
srikanthccv Dec 1, 2020
16824c6
Update method name
srikanthccv Dec 1, 2020
588970d
appropiate naming: instrumentation -> instrument
srikanthccv Dec 1, 2020
00d02c3
Review changes
srikanthccv Dec 2, 2020
018cea1
Update .gitignore
srikanthccv Dec 2, 2020
978a2a4
Small review changes
srikanthccv Dec 3, 2020
1a5e2c4
Remove unused import Union
srikanthccv Dec 3, 2020
89ce5a5
Use one container for both metrics and observers
srikanthccv Dec 3, 2020
9d291fc
Update contrib repo SHA
srikanthccv Dec 4, 2020
3b9fdc6
Merge branch 'master' into ot-1201
srikanthccv Dec 4, 2020
d534d60
Resolve conflicts
srikanthccv Dec 4, 2020
052dfb9
Update metrics/__init__.py
srikanthccv Dec 4, 2020
72ca4be
Merge branch 'ot-1201' of https://github.com/lonewolf3739/opentelemet…
srikanthccv Dec 4, 2020
f69e12f
Merge branch 'master' into ot-1201
srikanthccv Dec 5, 2020
9131317
Update contrib repo SHA
srikanthccv Dec 5, 2020
8e5456d
Merge branch 'ot-1201' of https://github.com/lonewolf3739/opentelemet…
srikanthccv Dec 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env:
# Otherwise, set variable to the commit of your branch on
# opentelemetry-python-contrib which is compatible with these Core repo
# changes.
CONTRIB_REPO_SHA: fd12b1d624fe44ca17d2c88c0ace39dc80db85df
CONTRIB_REPO_SHA: b37945bdeaf49822b240281d493d053995cc2b7b

jobs:
build:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ var
sdist
develop-eggs
.installed.cfg
pyvenv.cfg
lib
share/
lib64
__pycache__
venv*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ def test_get_collector_metric_type(self):
def test_get_collector_point(self):
aggregator = aggregate.SumAggregator()
int_counter = self._meter.create_counter(
"testName", "testDescription", "unit", int,
"testNameIntCounter", "testDescription", "unit", int,
)
float_counter = self._meter.create_counter(
"testName", "testDescription", "unit", float,
"testNameFloatCounter", "testDescription", "unit", float,
)
valuerecorder = self._meter.create_valuerecorder(
"testName", "testDescription", "unit", float,
"testNameValueRecorder", "testDescription", "unit", float,
)
result = metrics_exporter.get_collector_point(
ExportRecord(
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_export(self):

def test_translate_to_collector(self):
test_metric = self._meter.create_counter(
"testname", "testdesc", "unit", int, self._labels.keys()
"testcollector", "testdesc", "unit", int, self._labels.keys()
)
aggregator = aggregate.SumAggregator()
aggregator.update(123)
Expand All @@ -185,7 +185,9 @@ def test_translate_to_collector(self):
)
self.assertEqual(len(output_metrics), 1)
self.assertIsInstance(output_metrics[0], metrics_pb2.Metric)
self.assertEqual(output_metrics[0].metric_descriptor.name, "testname")
self.assertEqual(
output_metrics[0].metric_descriptor.name, "testcollector"
)
self.assertEqual(
output_metrics[0].metric_descriptor.description, "testdesc"
)
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-instrumentation/tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_ctor(self):
"measures the duration of the outbound HTTP request",
)

def test_ctor_types(self):
def test_ctor_type_client(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.CLIENT)
self.assertEqual(recorder._http_type, HTTPMetricType.CLIENT)
Expand All @@ -81,13 +81,17 @@ def test_ctor_types(self):
)
self.assertIsNone(recorder._server_duration)

def test_ctor_type_server(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.SERVER)
self.assertEqual(recorder._http_type, HTTPMetricType.SERVER)
self.assertTrue(
isinstance(recorder._server_duration, metrics.ValueRecorder)
)
self.assertIsNone(recorder._client_duration)

def test_ctor_type_both(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.BOTH)
self.assertEqual(recorder._http_type, HTTPMetricType.BOTH)
self.assertTrue(
Expand Down
111 changes: 56 additions & 55 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import atexit
import logging
import threading
from typing import Dict, Sequence, Tuple, Type, TypeVar
from typing import Dict, Sequence, Tuple, Type, TypeVar, Union

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export import (
Expand Down Expand Up @@ -356,59 +356,65 @@ def __init__(
):
self.instrumentation_info = instrumentation_info
self.processor = Processor(source.stateful, source.resource)
self.metrics = set()
self.observers = set()
self.metrics_lock = threading.Lock()
self.observers_lock = threading.Lock()
self.instruments = {}
self.instruments_lock = threading.Lock()
self.view_manager = ViewManager()

def _register_instrument(
self, instrument: Union[metrics_api.Metric, metrics_api.Observer]
):
name = instrument.name.strip().lower()
with self.instruments_lock:
if name in self.instruments:
raise ValueError(
"Multiple instruments can't be registered by the same name: ({})".format(
name
)
)
self.instruments[name] = instrument

def collect(self) -> None:
"""Collects all the metrics created with this `Meter` for export.

Utilizes the processor to create checkpoints of the current values in
each aggregator belonging to the metrics that were created with this
meter instance.
"""

self._collect_metrics()
self._collect_observers()

def _collect_metrics(self) -> None:
for metric in self.metrics:
if not metric.enabled:
continue
to_remove = []
with metric.bound_instruments_lock:
for (
labels,
bound_instrument,
) in metric.bound_instruments.items():
for view_data in bound_instrument.view_datas:
with self.instruments_lock:
for instrument in self.instruments.values():
if not instrument.enabled:
continue
if isinstance(instrument, metrics_api.Metric):
to_remove = []
with instrument.bound_instruments_lock:
for (
labels,
bound_instrument,
) in instrument.bound_instruments.items():
for view_data in bound_instrument.view_datas:
accumulation = Accumulation(
instrument,
view_data.labels,
view_data.aggregator,
)
self.processor.process(accumulation)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)

# Remove handles that were released
for labels in to_remove:
del instrument.bound_instruments[labels]
elif isinstance(instrument, metrics_api.Observer):
if not instrument.run():
continue

for labels, aggregator in instrument.aggregators.items():
accumulation = Accumulation(
metric, view_data.labels, view_data.aggregator
instrument, labels, aggregator
)
self.processor.process(accumulation)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)

# Remove handles that were released
for labels in to_remove:
del metric.bound_instruments[labels]

def _collect_observers(self) -> None:
with self.observers_lock:
for observer in self.observers:
if not observer.enabled:
continue

if not observer.run():
continue

for labels, aggregator in observer.aggregators.items():
accumulation = Accumulation(observer, labels, aggregator)
self.processor.process(accumulation)

def record_batch(
self,
labels: Dict[str, str],
Expand All @@ -432,8 +438,7 @@ def create_counter(
counter = Counter(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(counter)
self._register_instrument(counter)
return counter

def create_updowncounter(
Expand All @@ -448,8 +453,7 @@ def create_updowncounter(
counter = UpDownCounter(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(counter)
self._register_instrument(counter)
return counter

def create_valuerecorder(
Expand All @@ -464,8 +468,7 @@ def create_valuerecorder(
recorder = ValueRecorder(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(recorder)
self._register_instrument(recorder)
return recorder

def register_sumobserver(
Expand All @@ -488,8 +491,7 @@ def register_sumobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def register_updownsumobserver(
Expand All @@ -512,8 +514,7 @@ def register_updownsumobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def register_valueobserver(
Expand All @@ -536,13 +537,13 @@ def register_valueobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def unregister_observer(self, observer: metrics_api.Observer) -> None:
with self.observers_lock:
self.observers.remove(observer)
name = observer.name.strip().lower()
with self.instruments_lock:
self.instruments.pop(name)

def register_view(self, view):
self.view_manager.register_view(view)
Expand Down
49 changes: 42 additions & 7 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ def callback(observer):
self.assertIsInstance(observer, metrics_api.Observer)
observer.observe(45, {})

observer = metrics.ValueObserver(
meter.register_valueobserver(
callback, "name", "desc", "unit", int, (), True
)

meter.observers.add(observer)
meter.collect()
self.assertTrue(processor_mock.process.called)

Expand Down Expand Up @@ -164,6 +162,23 @@ def test_create_counter(self):
self.assertIs(meter_provider.resource, resource)
self.assertEqual(counter.meter, meter)

def test_instrument_same_name_error(self):
resource = Mock(spec=resources.Resource)
meter_provider = metrics.MeterProvider(resource=resource)
meter = meter_provider.get_meter(__name__)
counter = meter.create_counter("name", "desc", "unit", int,)
self.assertIsInstance(counter, metrics.Counter)
self.assertEqual(counter.value_type, int)
self.assertEqual(counter.name, "name")
self.assertIs(meter_provider.resource, resource)
self.assertEqual(counter.meter, meter)
with self.assertRaises(ValueError) as ctx:
_ = meter.create_counter("naME", "desc", "unit", int,)
self.assertTrue(
"Multiple instruments can't be registered by the same name: (name)"
in str(ctx.exception)
)

def test_create_updowncounter(self):
meter = metrics.MeterProvider().get_meter(__name__)
updowncounter = meter.create_updowncounter(
Expand Down Expand Up @@ -193,7 +208,7 @@ def test_register_sumobserver(self):
)

self.assertIsInstance(observer, metrics.SumObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -213,7 +228,7 @@ def test_register_updownsumobserver(self):
)

self.assertIsInstance(observer, metrics.UpDownSumObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -233,7 +248,7 @@ def test_register_valueobserver(self):
)

self.assertIsInstance(observer, metrics.ValueObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -253,7 +268,27 @@ def test_unregister_observer(self):
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.observers), 0)
self.assertEqual(len(meter.instruments), 0)

def test_unregister_and_reregister_observer(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = Mock()

observer = meter.register_valueobserver(
callback,
"nameCaSEinSENsitive",
"desc",
"unit",
int,
metrics.ValueObserver,
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.instruments), 0)
observer = meter.register_valueobserver(
callback, "name", "desc", "unit", int, metrics.ValueObserver
)


class TestMetric(unittest.TestCase):
Expand Down