diff --git a/docs/examples/basic_tracer/README.rst b/docs/examples/basic_tracer/README.rst new file mode 100644 index 00000000000..57d431a3877 --- /dev/null +++ b/docs/examples/basic_tracer/README.rst @@ -0,0 +1,37 @@ +Basic Trace +=========== + +These examples show how to use OpenTelemetry to create and export Spans. + +There are two different examples: + +* basic_trace: Shows how to configure a SpanProcessor and Exporter, and how to create a tracer and span. + +* resources: Shows how to add resource information to a Provider. Note that this must be run on a Google Compute Engine instance. + +The source files of these examples are available :scm_web:`here `. + +Installation +------------ + +.. code-block:: sh + + pip install opentelemetry-api + pip install opentelemetry-sdk + +Run the Example +--------------- + +.. code-block:: sh + + python .py + +The output will be shown in the console. + +Useful links +------------ + +- OpenTelemetry_ +- :doc:`../../api/trace` + +.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ diff --git a/docs/examples/basic_tracer/basic_trace.py b/docs/examples/basic_tracer/basic_trace.py new file mode 100644 index 00000000000..f7df424da3b --- /dev/null +++ b/docs/examples/basic_tracer/basic_trace.py @@ -0,0 +1,14 @@ +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) +with tracer.start_as_current_span("foo"): + print("Hello world!") diff --git a/docs/examples/basic_tracer/resources.py b/docs/examples/basic_tracer/resources.py new file mode 100644 index 00000000000..f37e73531d6 --- /dev/null +++ b/docs/examples/basic_tracer/resources.py @@ -0,0 +1,19 @@ +from opentelemetry import trace +from opentelemetry.sdk.resources import get_aggregated_resources +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) +from opentelemetry.tools.resource_detector import GoogleCloudResourceDetector + +resources = get_aggregated_resources([GoogleCloudResourceDetector()]) + +trace.set_tracer_provider(TracerProvider(resource=resources)) + +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) +with tracer.start_as_current_span("foo"): + print("Hello world!") diff --git a/ext/opentelemetry-exporter-cloud-monitoring/CHANGELOG.md b/ext/opentelemetry-exporter-cloud-monitoring/CHANGELOG.md index d51e8aa7490..e5f01ee5b02 100644 --- a/ext/opentelemetry-exporter-cloud-monitoring/CHANGELOG.md +++ b/ext/opentelemetry-exporter-cloud-monitoring/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Add support for resources + ([#853](https://github.com/open-telemetry/opentelemetry-python/pull/853)) + ## Version 0.10b0 Released 2020-06-23 diff --git a/ext/opentelemetry-exporter-cloud-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py b/ext/opentelemetry-exporter-cloud-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py index 8e71e5b2501..2c6e3abdb09 100644 --- a/ext/opentelemetry-exporter-cloud-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py +++ b/ext/opentelemetry-exporter-cloud-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py @@ -5,6 +5,7 @@ import google.auth from google.api.label_pb2 import LabelDescriptor from google.api.metric_pb2 import MetricDescriptor +from google.api.monitored_resource_pb2 import MonitoredResource from google.cloud.monitoring_v3 import MetricServiceClient from google.cloud.monitoring_v3.proto.metric_pb2 import TimeSeries @@ -14,12 +15,21 @@ MetricsExportResult, ) from opentelemetry.sdk.metrics.export.aggregate import SumAggregator +from opentelemetry.sdk.resources import Resource logger = logging.getLogger(__name__) MAX_BATCH_WRITE = 200 WRITE_INTERVAL = 10 UNIQUE_IDENTIFIER_KEY = "opentelemetry_id" +OT_RESOURCE_LABEL_TO_GCP = { + "gce_instance": { + "cloud.account.id": "project_id", + "host.id": "instance_id", + "cloud.zone": "zone", + } +} + # pylint is unable to resolve members of protobuf objects # pylint: disable=no-member @@ -56,13 +66,33 @@ def __init__( random.randint(0, 16 ** 8) ) - def _add_resource_info(self, series: TimeSeries) -> None: + @staticmethod + def _get_monitored_resource( + resource: Resource, + ) -> Optional[MonitoredResource]: """Add Google resource specific information (e.g. instance id, region). + See + https://cloud.google.com/monitoring/custom-metrics/creating-metrics#custom-metric-resources + for supported types Args: series: ProtoBuf TimeSeries """ - # TODO: Leverage this better + + if resource.labels.get("cloud.provider") != "gcp": + return None + resource_type = resource.labels["gcp.resource_type"] + if resource_type not in OT_RESOURCE_LABEL_TO_GCP: + return None + return MonitoredResource( + type=resource_type, + labels={ + gcp_label: str(resource.labels[ot_label]) + for ot_label, gcp_label in OT_RESOURCE_LABEL_TO_GCP[ + resource_type + ].items() + }, + ) def _batch_write(self, series: TimeSeries) -> None: """ Cloud Monitoring allows writing up to 200 time series at once @@ -162,9 +192,11 @@ def export( metric_descriptor = self._get_metric_descriptor(record) if not metric_descriptor: continue - - series = TimeSeries() - self._add_resource_info(series) + series = TimeSeries( + resource=self._get_monitored_resource( + record.instrument.meter.resource + ) + ) series.metric.type = metric_descriptor.type for key, value in record.labels: series.metric.labels[key] = str(value) diff --git a/ext/opentelemetry-exporter-cloud-monitoring/tests/test_cloud_monitoring.py b/ext/opentelemetry-exporter-cloud-monitoring/tests/test_cloud_monitoring.py index 1b36661699c..bc695f6d1f6 100644 --- a/ext/opentelemetry-exporter-cloud-monitoring/tests/test_cloud_monitoring.py +++ b/ext/opentelemetry-exporter-cloud-monitoring/tests/test_cloud_monitoring.py @@ -17,6 +17,7 @@ from google.api.label_pb2 import LabelDescriptor from google.api.metric_pb2 import MetricDescriptor +from google.api.monitored_resource_pb2 import MonitoredResource from google.cloud.monitoring_v3.proto.metric_pb2 import TimeSeries from opentelemetry.exporter.cloud_monitoring import ( @@ -27,17 +28,30 @@ ) from opentelemetry.sdk.metrics.export import MetricRecord from opentelemetry.sdk.metrics.export.aggregate import SumAggregator +from opentelemetry.sdk.resources import Resource class UnsupportedAggregator: pass +class MockMeter: + def __init__(self, resource=Resource.create_empty()): + self.resource = resource + + class MockMetric: - def __init__(self, name="name", description="description", value_type=int): + def __init__( + self, + name="name", + description="description", + value_type=int, + meter=None, + ): self.name = name self.description = description self.value_type = value_type + self.meter = meter or MockMeter() # pylint: disable=protected-access @@ -205,24 +219,41 @@ def test_export(self): } ) + resource = Resource( + labels={ + "cloud.account.id": 123, + "host.id": "host", + "cloud.zone": "US", + "cloud.provider": "gcp", + "extra_info": "extra", + "gcp.resource_type": "gce_instance", + "not_gcp_resource": "value", + } + ) + sum_agg_one = SumAggregator() sum_agg_one.checkpoint = 1 sum_agg_one.last_update_timestamp = (WRITE_INTERVAL + 1) * 1e9 exporter.export( [ MetricRecord( - MockMetric(), + MockMetric(meter=MockMeter(resource=resource)), (("label1", "value1"), ("label2", 1),), sum_agg_one, ), MetricRecord( - MockMetric(), + MockMetric(meter=MockMeter(resource=resource)), (("label1", "value2"), ("label2", 2),), sum_agg_one, ), ] ) - series1 = TimeSeries() + expected_resource = MonitoredResource( + type="gce_instance", + labels={"project_id": "123", "instance_id": "host", "zone": "US"}, + ) + + series1 = TimeSeries(resource=expected_resource) series1.metric.type = "custom.googleapis.com/OpenTelemetry/name" series1.metric.labels["label1"] = "value1" series1.metric.labels["label2"] = "1" @@ -231,7 +262,7 @@ def test_export(self): point.interval.end_time.seconds = WRITE_INTERVAL + 1 point.interval.end_time.nanos = 0 - series2 = TimeSeries() + series2 = TimeSeries(resource=expected_resource) series2.metric.type = "custom.googleapis.com/OpenTelemetry/name" series2.metric.labels["label1"] = "value2" series2.metric.labels["label2"] = "2" @@ -342,3 +373,64 @@ def test_unique_identifier(self): first_call[0][1][0].metric.labels[UNIQUE_IDENTIFIER_KEY], second_call[0][1][0].metric.labels[UNIQUE_IDENTIFIER_KEY], ) + + def test_extract_resources(self): + exporter = CloudMonitoringMetricsExporter(project_id=self.project_id) + + self.assertIsNone( + exporter._get_monitored_resource(Resource.create_empty()) + ) + resource = Resource( + labels={ + "cloud.account.id": 123, + "host.id": "host", + "cloud.zone": "US", + "cloud.provider": "gcp", + "extra_info": "extra", + "gcp.resource_type": "gce_instance", + "not_gcp_resource": "value", + } + ) + expected_extract = MonitoredResource( + type="gce_instance", + labels={"project_id": "123", "instance_id": "host", "zone": "US"}, + ) + self.assertEqual( + exporter._get_monitored_resource(resource), expected_extract + ) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "gcp.resource_type": "gce_instance", + "cloud.provider": "gcp", + } + ) + # Should throw when passed a malformed GCP resource dict + self.assertRaises(KeyError, exporter._get_monitored_resource, resource) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "gcp.resource_type": "unsupported_gcp_resource", + "cloud.provider": "gcp", + } + ) + self.assertIsNone(exporter._get_monitored_resource(resource)) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "cloud.provider": "aws", + } + ) + self.assertIsNone(exporter._get_monitored_resource(resource)) diff --git a/ext/opentelemetry-exporter-cloud-trace/CHANGELOG.md b/ext/opentelemetry-exporter-cloud-trace/CHANGELOG.md index 5ff312743fc..22eac3e4bf9 100644 --- a/ext/opentelemetry-exporter-cloud-trace/CHANGELOG.md +++ b/ext/opentelemetry-exporter-cloud-trace/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Add support for resources and resource detector + ([#853](https://github.com/open-telemetry/opentelemetry-python/pull/853)) + ## Version 0.10b0 Released 2020-06-23 diff --git a/ext/opentelemetry-exporter-cloud-trace/setup.cfg b/ext/opentelemetry-exporter-cloud-trace/setup.cfg index 41ffc4116ad..4437f40fdd3 100644 --- a/ext/opentelemetry-exporter-cloud-trace/setup.cfg +++ b/ext/opentelemetry-exporter-cloud-trace/setup.cfg @@ -39,6 +39,7 @@ package_dir= =src packages=find_namespace: install_requires = + requests opentelemetry-api opentelemetry-sdk google-cloud-trace diff --git a/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/exporter/cloud_trace/__init__.py b/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/exporter/cloud_trace/__init__.py index 647ad210402..ea36d227275 100644 --- a/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/exporter/cloud_trace/__init__.py +++ b/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/exporter/cloud_trace/__init__.py @@ -55,6 +55,7 @@ from opentelemetry.exporter.cloud_trace.version import ( __version__ as cloud_trace_version, ) +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Event from opentelemetry.sdk.trace.export import Span, SpanExporter, SpanExportResult from opentelemetry.sdk.util import BoundedDict @@ -106,7 +107,6 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult: # pylint: disable=broad-except except Exception as ex: logger.error("Error when creating span %s", span, exc_info=ex) - try: self.client.batch_write_spans( "projects/{}".format(self.project_id), cloud_trace_spans, @@ -150,6 +150,11 @@ def _translate_to_cloud_trace( MAX_SPAN_ATTRS, ) + # Span does not support a MonitoredResource object. We put the + # information into labels instead. + resources_and_attrs = _extract_resources(span.resource) + resources_and_attrs.update(span.attributes) + cloud_trace_spans.append( { "name": span_name, @@ -161,7 +166,9 @@ def _translate_to_cloud_trace( "end_time": end_time, "parent_span_id": parent_id, "attributes": _extract_attributes( - span.attributes, MAX_SPAN_ATTRS, add_agent_attr=True + resources_and_attrs, + MAX_SPAN_ATTRS, + add_agent_attr=True, ), "links": _extract_links(span.links), "status": _extract_status(span.status), @@ -295,6 +302,31 @@ def _strip_characters(ot_version): return "".join(filter(lambda x: x.isdigit() or x == ".", ot_version)) +OT_RESOURCE_LABEL_TO_GCP = { + "gce_instance": { + "cloud.account.id": "project_id", + "host.id": "instance_id", + "cloud.zone": "zone", + } +} + + +def _extract_resources(resource: Resource) -> Dict[str, str]: + if resource.labels.get("cloud.provider") != "gcp": + return {} + resource_type = resource.labels["gcp.resource_type"] + if resource_type not in OT_RESOURCE_LABEL_TO_GCP: + return {} + return { + "g.co/r/{}/{}".format(resource_type, gcp_resource_key,): str( + resource.labels[ot_resource_key] + ) + for ot_resource_key, gcp_resource_key in OT_RESOURCE_LABEL_TO_GCP[ + resource_type + ].items() + } + + def _extract_attributes( attrs: types.Attributes, num_attrs_limit: int, diff --git a/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/tools/resource_detector.py b/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/tools/resource_detector.py new file mode 100644 index 00000000000..5dbf7627f81 --- /dev/null +++ b/ext/opentelemetry-exporter-cloud-trace/src/opentelemetry/tools/resource_detector.py @@ -0,0 +1,47 @@ +import requests + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk.resources import Resource, ResourceDetector + +_GCP_METADATA_URL = ( + "http://metadata.google.internal/computeMetadata/v1/?recursive=true" +) +_GCP_METADATA_URL_HEADER = {"Metadata-Flavor": "Google"} + + +def get_gce_resources(): + """ Resource finder for common GCE attributes + + See: https://cloud.google.com/compute/docs/storing-retrieving-metadata + """ + token = attach(set_value("suppress_instrumentation", True)) + all_metadata = requests.get( + _GCP_METADATA_URL, headers=_GCP_METADATA_URL_HEADER + ).json() + detach(token) + gce_resources = { + "host.id": all_metadata["instance"]["id"], + "cloud.account.id": all_metadata["project"]["projectId"], + "cloud.zone": all_metadata["instance"]["zone"].split("/")[-1], + "cloud.provider": "gcp", + "gcp.resource_type": "gce_instance", + } + return gce_resources + + +_RESOURCE_FINDERS = [get_gce_resources] + + +class GoogleCloudResourceDetector(ResourceDetector): + def __init__(self, raise_on_error=False): + super().__init__(raise_on_error) + self.cached = False + self.gcp_resources = {} + + def detect(self) -> "Resource": + if not self.cached: + self.cached = True + for resource_finder in _RESOURCE_FINDERS: + found_resources = resource_finder() + self.gcp_resources.update(found_resources) + return Resource(self.gcp_resources) diff --git a/ext/opentelemetry-exporter-cloud-trace/tests/test_cloud_trace_exporter.py b/ext/opentelemetry-exporter-cloud-trace/tests/test_cloud_trace_exporter.py index 3b89b55046e..c5444241b69 100644 --- a/ext/opentelemetry-exporter-cloud-trace/tests/test_cloud_trace_exporter.py +++ b/ext/opentelemetry-exporter-cloud-trace/tests/test_cloud_trace_exporter.py @@ -1,4 +1,4 @@ -# Copyright OpenTelemetry Authors +# Copyright The OpenTelemetry Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ _extract_attributes, _extract_events, _extract_links, + _extract_resources, _extract_status, _format_attribute_value, _strip_characters, @@ -38,6 +39,7 @@ from opentelemetry.exporter.cloud_trace.version import ( __version__ as cloud_trace_version, ) +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Event from opentelemetry.sdk.trace.export import Span from opentelemetry.trace import Link, SpanContext, SpanKind @@ -92,6 +94,15 @@ def test_constructor_explicit(self): def test_export(self): trace_id = "6e0c63257de34c92bf9efcd03927272e" span_id = "95bb5edabd45950f" + resource_info = Resource( + { + "cloud.account.id": 123, + "host.id": "host", + "cloud.zone": "US", + "cloud.provider": "gcp", + "gcp.resource_type": "gce_instance", + } + ) span_datas = [ Span( name="span_name", @@ -102,6 +113,8 @@ def test_export(self): ), parent=None, kind=SpanKind.INTERNAL, + resource=resource_info, + attributes={"attr_key": "attr_value"}, ) ] @@ -116,6 +129,13 @@ def test_export(self): ), "attributes": ProtoSpan.Attributes( attribute_map={ + "g.co/r/gce_instance/zone": _format_attribute_value("US"), + "g.co/r/gce_instance/instance_id": _format_attribute_value( + "host" + ), + "g.co/r/gce_instance/project_id": _format_attribute_value( + "123" + ), "g.co/agent": _format_attribute_value( "opentelemetry-python {}; google-cloud-trace-exporter {}".format( _strip_characters( @@ -125,7 +145,8 @@ def test_export(self): ), _strip_characters(cloud_trace_version), ) - ) + ), + "attr_key": _format_attribute_value("attr_value"), } ), "links": None, @@ -284,6 +305,62 @@ def test_extract_links(self): ), ) + def test_extract_resources(self): + self.assertEqual(_extract_resources(Resource.create_empty()), {}) + resource = Resource( + labels={ + "cloud.account.id": 123, + "host.id": "host", + "cloud.zone": "US", + "cloud.provider": "gcp", + "extra_info": "extra", + "gcp.resource_type": "gce_instance", + "not_gcp_resource": "value", + } + ) + expected_extract = { + "g.co/r/gce_instance/project_id": "123", + "g.co/r/gce_instance/instance_id": "host", + "g.co/r/gce_instance/zone": "US", + } + self.assertEqual(_extract_resources(resource), expected_extract) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "gcp.resource_type": "gce_instance", + "cloud.provider": "gcp", + } + ) + # Should throw when passed a malformed GCP resource dict + self.assertRaises(KeyError, _extract_resources, resource) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "gcp.resource_type": "unsupported_gcp_resource", + "cloud.provider": "gcp", + } + ) + self.assertEqual(_extract_resources(resource), {}) + + resource = Resource( + labels={ + "cloud.account.id": "123", + "host.id": "host", + "extra_info": "extra", + "not_gcp_resource": "value", + "cloud.provider": "aws", + } + ) + self.assertEqual(_extract_resources(resource), {}) + # pylint:disable=too-many-locals def test_truncate(self): """Cloud Trace API imposes limits on the length of many things, diff --git a/ext/opentelemetry-exporter-cloud-trace/tests/test_gcp_resource_detector.py b/ext/opentelemetry-exporter-cloud-trace/tests/test_gcp_resource_detector.py new file mode 100644 index 00000000000..df821213308 --- /dev/null +++ b/ext/opentelemetry-exporter-cloud-trace/tests/test_gcp_resource_detector.py @@ -0,0 +1,83 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest import mock + +from opentelemetry.sdk.resources import Resource +from opentelemetry.tools.resource_detector import ( + _GCP_METADATA_URL, + GoogleCloudResourceDetector, + get_gce_resources, +) + +RESOURCES_JSON_STRING = { + "instance": {"id": "instance_id", "zone": "projects/123/zones/zone"}, + "project": {"projectId": "project_id"}, +} + + +class TestGCEResourceFinder(unittest.TestCase): + @mock.patch("opentelemetry.tools.resource_detector.requests.get") + def test_finding_gce_resources(self, getter): + getter.return_value.json.return_value = RESOURCES_JSON_STRING + found_resources = get_gce_resources() + self.assertEqual(getter.call_args_list[0][0][0], _GCP_METADATA_URL) + self.assertEqual( + found_resources, + { + "host.id": "instance_id", + "cloud.provider": "gcp", + "cloud.account.id": "project_id", + "cloud.zone": "zone", + "gcp.resource_type": "gce_instance", + }, + ) + + +class TestGoogleCloudResourceDetector(unittest.TestCase): + @mock.patch("opentelemetry.tools.resource_detector.requests.get") + def test_finding_resources(self, getter): + resource_finder = GoogleCloudResourceDetector() + getter.return_value.json.return_value = RESOURCES_JSON_STRING + found_resources = resource_finder.detect() + self.assertEqual(getter.call_args_list[0][0][0], _GCP_METADATA_URL) + self.assertEqual( + found_resources, + Resource( + labels={ + "host.id": "instance_id", + "cloud.provider": "gcp", + "cloud.account.id": "project_id", + "cloud.zone": "zone", + "gcp.resource_type": "gce_instance", + } + ), + ) + self.assertEqual(getter.call_count, 1) + + found_resources = resource_finder.detect() + self.assertEqual(getter.call_count, 1) + self.assertEqual( + found_resources, + Resource( + labels={ + "host.id": "instance_id", + "cloud.provider": "gcp", + "cloud.account.id": "project_id", + "cloud.zone": "zone", + "gcp.resource_type": "gce_instance", + } + ), + ) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index c7d791ccc4c..46ad8b8e29c 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Add support for resources and resource detector + ([#853](https://github.com/open-telemetry/opentelemetry-python/pull/853)) + ## Version 0.10b0 Released 2020-06-23 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 91c491f09f6..a8e9ac65be1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -12,11 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc +import concurrent.futures +import logging +import os import typing from json import dumps LabelValue = typing.Union[str, bool, int, float] Labels = typing.Dict[str, LabelValue] +logger = logging.getLogger(__name__) class Resource: @@ -55,3 +60,61 @@ def __hash__(self): _EMPTY_RESOURCE = Resource({}) + + +class ResourceDetector(abc.ABC): + def __init__(self, raise_on_error=False): + self.raise_on_error = raise_on_error + + @abc.abstractmethod + def detect(self) -> "Resource": + raise NotImplementedError() + + +class OTELResourceDetector(ResourceDetector): + # pylint: disable=no-self-use + def detect(self) -> "Resource": + env_resources_items = os.environ.get("OTEL_RESOURCE") + env_resource_map = {} + if env_resources_items: + env_resource_map = { + key.strip(): value.strip() + for key, value in ( + item.split("=") for item in env_resources_items.split(",") + ) + } + return Resource(env_resource_map) + + +def get_aggregated_resources( + detectors: typing.List["ResourceDetector"], + initial_resource: typing.Optional[Resource] = None, + timeout=5, +) -> "Resource": + """ Retrieves resources from detectors in the order that they were passed + + :param detectors: List of resources in order of priority + :param initial_resource: Static resource. This has highest priority + :param timeout: Number of seconds to wait for each detector to return + :return: + """ + final_resource = initial_resource or _EMPTY_RESOURCE + detectors = [OTELResourceDetector()] + detectors + + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = [executor.submit(detector.detect) for detector in detectors] + for detector_ind, future in enumerate(futures): + detector = detectors[detector_ind] + try: + detected_resources = future.result(timeout=timeout) + # pylint: disable=broad-except + except Exception as ex: + if detector.raise_on_error: + raise ex + logger.warning( + "Exception %s in detector %s, ignoring", ex, detector + ) + detected_resources = _EMPTY_RESOURCE + finally: + final_resource = final_resource.merge(detected_resources) + return final_resource diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index db377c09243..1118b424888 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -520,6 +520,7 @@ def to_json(self, indent=4): f_span["attributes"] = self._format_attributes(self.attributes) f_span["events"] = self._format_events(self.events) f_span["links"] = self._format_links(self.links) + f_span["resource"] = self.resource.labels return json.dumps(f_span, indent=indent) diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index 959e23f0def..698b4fdda3f 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -14,7 +14,9 @@ # pylint: disable=protected-access +import os import unittest +from unittest import mock from opentelemetry.sdk import resources @@ -83,3 +85,94 @@ def test_immutability(self): labels["cost"] = 999.91 self.assertEqual(resource.labels, labels_copy) + + def test_otel_resource_detector(self): + detector = resources.OTELResourceDetector() + self.assertEqual(detector.detect(), resources.Resource.create_empty()) + os.environ["OTEL_RESOURCE"] = "k=v" + self.assertEqual(detector.detect(), resources.Resource({"k": "v"})) + os.environ["OTEL_RESOURCE"] = " k = v " + self.assertEqual(detector.detect(), resources.Resource({"k": "v"})) + os.environ["OTEL_RESOURCE"] = "k=v,k2=v2" + self.assertEqual( + detector.detect(), resources.Resource({"k": "v", "k2": "v2"}) + ) + os.environ["OTEL_RESOURCE"] = " k = v , k2 = v2 " + self.assertEqual( + detector.detect(), resources.Resource({"k": "v", "k2": "v2"}) + ) + + def test_aggregated_resources(self): + aggregated_resources = resources.get_aggregated_resources([]) + self.assertEqual( + aggregated_resources, resources.Resource.create_empty() + ) + + static_resource = resources.Resource({"static_key": "static_value"}) + + self.assertEqual( + resources.get_aggregated_resources( + [], initial_resource=static_resource + ), + static_resource, + ) + + resource_detector = mock.Mock(spec=resources.ResourceDetector) + resource_detector.detect.return_value = resources.Resource( + {"static_key": "overwrite_existing_value", "key": "value"} + ) + self.assertEqual( + resources.get_aggregated_resources( + [resource_detector], initial_resource=static_resource + ), + resources.Resource({"static_key": "static_value", "key": "value"}), + ) + + resource_detector1 = mock.Mock(spec=resources.ResourceDetector) + resource_detector1.detect.return_value = resources.Resource( + {"key1": "value1"} + ) + resource_detector2 = mock.Mock(spec=resources.ResourceDetector) + resource_detector2.detect.return_value = resources.Resource( + {"key2": "value2", "key3": "value3"} + ) + resource_detector3 = mock.Mock(spec=resources.ResourceDetector) + resource_detector3.detect.return_value = resources.Resource( + { + "key2": "overwrite_existing_value", + "key3": "overwrite_existing_value", + "key4": "value4", + } + ) + self.assertEqual( + resources.get_aggregated_resources( + [resource_detector1, resource_detector2, resource_detector3] + ), + resources.Resource( + { + "key1": "value1", + "key2": "value2", + "key3": "value3", + "key4": "value4", + } + ), + ) + + resource_detector = mock.Mock(spec=resources.ResourceDetector) + resource_detector.detect.side_effect = Exception() + resource_detector.raise_on_error = False + self.assertEqual( + resources.get_aggregated_resources( + [resource_detector, resource_detector1] + ), + resources.Resource({"key1": "value1"}), + ) + + resource_detector = mock.Mock(spec=resources.ResourceDetector) + resource_detector.detect.side_effect = Exception() + resource_detector.raise_on_error = True + self.assertRaises( + Exception, + resources.get_aggregated_resources, + [resource_detector, resource_detector1], + ) diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index d68d5ca420a..5203ae0afc1 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -954,10 +954,11 @@ def test_to_json(self): "end_time": null, "attributes": {}, "events": [], - "links": [] + "links": [], + "resource": {} }""", ) self.assertEqual( span.to_json(indent=None), - '{"name": "span-name", "context": {"trace_id": "0x000000000000000000000000deadbeef", "span_id": "0x00000000deadbef0", "trace_state": "{}"}, "kind": "SpanKind.INTERNAL", "parent_id": null, "start_time": null, "end_time": null, "attributes": {}, "events": [], "links": []}', + '{"name": "span-name", "context": {"trace_id": "0x000000000000000000000000deadbeef", "span_id": "0x00000000deadbef0", "trace_state": "{}"}, "kind": "SpanKind.INTERNAL", "parent_id": null, "start_time": null, "end_time": null, "attributes": {}, "events": [], "links": [], "resource": {}}', )