Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SDK MeterProvider #2401

Merged
merged 4 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util._once import Once

_logger = getLogger(__name__)

Expand Down Expand Up @@ -171,30 +172,30 @@ def __init__(
self._metric_readers = metric_readers

for metric_reader in self._sdk_config.metric_readers:
metric_reader._register_measurement_consumer(self)
metric_reader._set_collect_callback(
self._measurement_consumer.collect
)

self._shutdown_once = Once()
self._shutdown = False

def force_flush(self) -> bool:

# FIXME implement a timeout

metric_reader_result = True

for metric_reader in self._sdk_config.metric_readers:
metric_reader_result = (
metric_reader_result and metric_reader.force_flush()
)

if not metric_reader_result:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
_logger.warning("Unable to force flush all metric readers")

return metric_reader_result
metric_reader.collect()
return True
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

def shutdown(self):
# FIXME implement a timeout

if self._shutdown:
def _shutdown():
self._shutdown = True
aabmass marked this conversation as resolved.
Show resolved Hide resolved

did_shutdown = self._shutdown_once.do_once(_shutdown)

if not did_shutdown:
_logger.warning("shutdown can only be called once")
return False

Expand All @@ -210,8 +211,6 @@ def shutdown(self):

overall_result = overall_result and metric_reader_result

self._shutdown = True

if self._atexit_handler is not None:
unregister(self._atexit_handler)
self._atexit_handler = None
Expand Down
41 changes: 40 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,24 @@
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk.resources import Resource
from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc


class TestMeterProvider(TestCase):
class DummyMetricReader(MetricReader):
def __init__(self):
super().__init__(AggregationTemporality.CUMULATIVE)

def _receive_metrics(self, metrics):
pass

def shutdown(self):
return True


class TestMeterProvider(ConcurrencyTestBase):
def test_resource(self):
"""
`MeterProvider` provides a way to allow a `Resource` to be specified.
Expand Down Expand Up @@ -139,6 +153,31 @@ def test_shutdown_subsequent_calls(self):
with self.assertLogs(level=WARNING):
meter_provider.shutdown()

@patch("opentelemetry.sdk._metrics._logger")
def test_shutdown_race(self, mock_logger):
mock_logger.warning = MockFunc()
meter_provider = MeterProvider()
num_threads = 70
self.run_with_many_threads(
meter_provider.shutdown, num_threads=num_threads
)
self.assertEqual(mock_logger.warning.call_count, num_threads - 1)

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_measurement_collect_callback(
self, mock_sync_measurement_consumer
):
metric_readers = [DummyMetricReader()] * 5
sync_consumer_instance = mock_sync_measurement_consumer()
sync_consumer_instance.collect = MockFunc()
MeterProvider(metric_readers=metric_readers)

for reader in metric_readers:
reader.collect()
self.assertEqual(
sync_consumer_instance.collect.call_count, len(metric_readers)
)

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_creates_sync_measurement_consumer(
self, mock_sync_measurement_consumer
Expand Down