From dba0e243be9aac29d30a63cd6f8c57b8abda5256 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Wed, 28 Aug 2024 17:59:23 -0600 Subject: [PATCH] Add entities prototype Fixes #4161 --- opentelemetry-sdk/pyproject.toml | 4 + .../opentelemetry/sdk/resources/__init__.py | 271 +++++++++++++++++- .../tests/logs/test_log_record.py | 16 +- .../tests/resources/test_entities.py | 147 ++++++++++ 4 files changed, 423 insertions(+), 15 deletions(-) create mode 100644 opentelemetry-sdk/tests/resources/test_entities.py diff --git a/opentelemetry-sdk/pyproject.toml b/opentelemetry-sdk/pyproject.toml index 69895bfd5b6..9358878ba73 100644 --- a/opentelemetry-sdk/pyproject.toml +++ b/opentelemetry-sdk/pyproject.toml @@ -69,6 +69,10 @@ otel = "opentelemetry.sdk.resources:OTELResourceDetector" process = "opentelemetry.sdk.resources:ProcessResourceDetector" os = "opentelemetry.sdk.resources:OsResourceDetector" +[project.entry-points.opentelemetry_entity_detector] +type0= "opentelemetry.sdk.resources:Type0EntityDetector" +type1= "opentelemetry.sdk.resources:Type1EntityDetector" + [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python/tree/main/opentelemetry-sdk" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 0ebd42349c4..90f18f1230b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -55,18 +55,18 @@ above example. """ -import abc import concurrent.futures import logging import os import platform import sys -import typing +from abc import ABC, abstractmethod from json import dumps from os import environ from types import ModuleType -from typing import List, MutableMapping, Optional, cast +from typing import List, MutableMapping, Optional, cast, Mapping from urllib import parse +from warnings import warn from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk.environment_variables import ( @@ -88,7 +88,7 @@ pass LabelValue = AttributeValue -Attributes = typing.Mapping[str, LabelValue] +Attributes = Mapping[str, LabelValue] logger = logging.getLogger(__name__) CLOUD_PROVIDER = ResourceAttributes.CLOUD_PROVIDER @@ -152,6 +152,62 @@ _OPENTELEMETRY_SDK_VERSION: str = version("opentelemetry-sdk") +class Entity: + + def __init__( + self, + type_: str, + id_: Mapping[str, str], + attributes: Optional[Attributes] = None, + schema_url: Optional[str] = None, + ): + + if not type_: + raise Exception("Entity type must not be empty") + + if attributes is None: + attributes = {} + + self._type = type_ + + # These are attributes that identify the entity and must not change + # during the lifetime of the entity. id_ must contain at least one + # attribute. + if not id_: + raise Exception("Entity id must not be empty") + + self._id = id_ + + # These are attributes that do not identify the entity and may change + # during the lifetime of the entity. + self._attributes = attributes + + if schema_url is None: + schema_url = "" + + self._schema_url = schema_url + + @property + def type(self): + return self._type + + @property + def id(self): + # FIXME we need a checker here that makes sure that the id attributes + # are compliant with the spec. Not implementing it here since this + # seems like a thing that should be available to other components as + # well. + return self._id + + @property + def attributes(self): + return self._attributes + + @property + def schema_url(self): + return self._schema_url + + class Resource: """A Resource is an immutable representation of the entity producing telemetry as Attributes.""" @@ -159,17 +215,27 @@ class Resource: _schema_url: str def __init__( - self, attributes: Attributes, schema_url: typing.Optional[str] = None + self, + attributes: Attributes, + schema_url: Optional[str] = None, + *entities, ): self._attributes = BoundedAttributes(attributes=attributes) if schema_url is None: schema_url = "" + else: + warn("Resource.schema_url is deprecated", DeprecationWarning) + self._schema_url = schema_url + # FIXME the spec draft says the Resource will have an EntityRef proto + # figure out what that is supposed to be. + self._entities = entities + @staticmethod def create( - attributes: typing.Optional[Attributes] = None, - schema_url: typing.Optional[str] = None, + attributes: Optional[Attributes] = None, + schema_url: Optional[str] = None, ) -> "Resource": """Creates a new `Resource` from attributes. @@ -181,7 +247,7 @@ def create( The newly-created Resource. """ - if not attributes: + if attributes is None: attributes = {} otel_experimental_resource_detectors = {"otel"}.union( @@ -225,6 +291,51 @@ def create( ) return resource + @staticmethod + def create_using_entities( + attributes: Optional[Attributes] = None, + schema_url: Optional[str] = None, + ) -> "Resource": + # This method is added here with the intention of not disturbing the + # previous API create method for backwards compatibility reasons. + + if attributes is None: + attributes = {} + + if schema_url is None: + schema_url = "" + + selected_entities = _select_entities( + [ + entity_detector.detect() + for entity_detector in _get_entity_detectors() + ] + ) + + resource_attributes = {} + + for selected_entity in selected_entities: + for key, value in selected_entity._id.items(): + resource_attributes[key] = value + + for key, value in selected_entity._attributes.items(): + resource_attributes[key] = value + + entity_schema_url = selected_entities[0].schema_url + + for selected_entity in selected_entities: + if selected_entity.schema_url != entity_schema_url: + entity_schema_url = None + break + + resource_attributes.update(attributes) + + resource = Resource.create( + attributes=resource_attributes, schema_url=entity_schema_url + ) + + return resource + @staticmethod def get_empty() -> "Resource": return _EMPTY_RESOURCE @@ -298,6 +409,72 @@ def to_json(self, indent: int = 4) -> str: ) +def _get_entity_detectors(): + + entity_detectors: List[ResourceDetector] = [] + + for entity_detector in entry_points( + group="opentelemetry_entity_detector", + ): + + entity_detectors.append(entity_detector.load()()) + + # This checker is added here but it could live in the configuration + # mechanism, so that it detects a possible error when 2 entity + # detectors have the same priority even earlier. + if len(entity_detectors) > 1: + + sorted_entity_detectors = sorted( + entity_detectors, key=lambda x: x.priority + ) + + priorities = set() + + for entity_detector in sorted_entity_detectors: + + if entity_detector.priority in priorities: + raise ValueError( + f"Duplicate priority {entity_detector.priority}" + f"for entity detector of type {type(entity_detector)}" + ) + + priorities.add(entity_detector.priority) + + return entity_detectors + + +def _select_entities(unselected_entities): + + selected_entities = [unselected_entities.pop(0)] + + for unselected_entity in unselected_entities: + + for selected_entity in selected_entities: + + if selected_entity.type == unselected_entity.type: + if ( + selected_entity.id == unselected_entity.id + and selected_entity.schema_url + == (unselected_entity.schema_url) + ): + for key, value in unselected_entity.attributes.items(): + if key not in selected_entity.attributes.keys(): + selected_entity._attributes[key] = value + break + elif ( + selected_entity.id == unselected_entity.id + and selected_entity.schema_url + != (unselected_entity.schema_url) + ): + break + elif selected_entity.id != unselected_entity.id: + break + else: + selected_entities.append(unselected_entity) + + return selected_entities + + _EMPTY_RESOURCE = Resource({}) _DEFAULT_RESOURCE = Resource( { @@ -308,19 +485,89 @@ def to_json(self, indent: int = 4) -> str: ) -class ResourceDetector(abc.ABC): +class Detector(ABC): def __init__(self, raise_on_error: bool = False) -> None: self.raise_on_error = raise_on_error - @abc.abstractmethod + @abstractmethod + def detect(self): + raise NotImplementedError() + + +class ResourceDetector(Detector): + @abstractmethod def detect(self) -> "Resource": raise NotImplementedError() +class EntityDetector(Detector): + def __init__(self, raise_on_error: bool = False) -> None: + self.raise_on_error = raise_on_error + + @abstractmethod + def detect(self) -> "Entity": + raise NotImplementedError() + + @property + @abstractmethod + def priority(self): + raise NotImplementedError() + + +class Type0EntityDetector(EntityDetector): + + _entity = None + + def detect(self) -> Entity: + + if self._entity is None: + # The OTEP says an entity detector must not provide two entities of + # the same type. It seems to me that this means it will only + # provide one entity at all since the entity detector is associated + # to a particular "type" (process, OS, etc) + self._entity = Entity( + "type0", id_={"a": "b"}, attributes={"c": "d"} + ) + + return self._entity + + @property + def priority(self): + # This probably needs a configuration mechanism so that it can get its + # priority from some configuration file or something else. + return 0 + + +class Type1EntityDetector(EntityDetector): + + _entity = None + + def detect(self) -> Entity: + + if self._entity is None: + # The OTEP says an entity detector must not provide two entities of + # the same type. It seems to me that this means it will only + # provide one entity at all since the entity detector is associated + # to a particular "type" (process, OS, etc) + + self._entity = Entity( + "type1", id_={"a": "b"}, attributes={"c": "d"} + ) + + return self._entity + + @property + def priority(self): + # This probably needs a configuration mechanism so that it can get its + # priority from some configuration file or something else. + return 1 + + class OTELResourceDetector(ResourceDetector): # pylint: disable=no-self-use def detect(self) -> "Resource": + # An example of OTEL_RESOURCE_ATTRIBUTES is "key0=value0,key1=value1" env_resources_items = environ.get(OTEL_RESOURCE_ATTRIBUTES) env_resource_map = {} @@ -472,8 +719,8 @@ def detect(self) -> "Resource": def get_aggregated_resources( - detectors: typing.List["ResourceDetector"], - initial_resource: typing.Optional[Resource] = None, + detectors: List["ResourceDetector"], + initial_resource: Optional[Resource] = None, timeout: int = 5, ) -> "Resource": """Retrieves resources from detectors in the order that they were passed diff --git a/opentelemetry-sdk/tests/logs/test_log_record.py b/opentelemetry-sdk/tests/logs/test_log_record.py index 9c3746989b1..6320d248f41 100644 --- a/opentelemetry-sdk/tests/logs/test_log_record.py +++ b/opentelemetry-sdk/tests/logs/test_log_record.py @@ -128,11 +128,21 @@ def test_log_record_dropped_attributes_set_limits_warning_once(self): attributes=attr, limits=limits, ) - self.assertEqual(len(cw), 1) - self.assertIsInstance(cw[-1].message, LogDroppedAttributesWarning) + + non_deprecation_warnings = [] + + for warning in cw: + if warning.category is DeprecationWarning: + continue + non_deprecation_warnings.append(warning) + + self.assertEqual(len(non_deprecation_warnings), 1) + self.assertIsInstance( + non_deprecation_warnings[0].message, LogDroppedAttributesWarning + ) self.assertIn( "Log record attributes were dropped due to limits", - str(cw[-1].message), + str(non_deprecation_warnings[0].message), ) def test_log_record_dropped_attributes_unset_limits(self): diff --git a/opentelemetry-sdk/tests/resources/test_entities.py b/opentelemetry-sdk/tests/resources/test_entities.py new file mode 100644 index 00000000000..667321ff214 --- /dev/null +++ b/opentelemetry-sdk/tests/resources/test_entities.py @@ -0,0 +1,147 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from unittest.mock import patch, Mock + +from opentelemetry.sdk.resources import ( + Resource, + Entity, + Type0EntityDetector, + Type1EntityDetector, + _select_entities, + _get_entity_detectors, + EntityDetector +) + + +class TestEntities(TestCase): + + def test_get_entity_detectors(self): + + entity_detectors = _get_entity_detectors() + + self.assertEqual(len(entity_detectors), 2) + + self.assertIsInstance(entity_detectors[0], Type0EntityDetector) + self.assertIsInstance(entity_detectors[1], Type1EntityDetector) + + def test_get_entity_detectors_same_priorities(self): + class Type2EntityDetector(EntityDetector): + + _entity = None + + def detect(self) -> Entity: + + if self._entity is None: + self._entity = Entity( + "type0", id_={"a": "b"}, attributes={"c": "d"} + ) + + return self._entity + + @property + def priority(self): + return 0 + + with patch( + "opentelemetry.sdk.resources.entry_points", + return_value=[ + Mock(load=Mock(return_value=Type0EntityDetector)), + Mock(load=Mock(return_value=Type1EntityDetector)), + Mock(load=Mock(return_value=Type2EntityDetector)), + ] + ): + + with self.assertRaises(ValueError): + _get_entity_detectors() + + def test_select_entities(self): + + unselected_entities = [ + Entity("type_0", {"a": "a"}, {"b": "b"}, "a"), + Entity("type_0", {"a": "a"}, {"c": "c"}, "a"), + ] + + selected_entities = _select_entities(unselected_entities) + + self.assertEqual(len(selected_entities), 1) + self.assertEqual(selected_entities[0].type, "type_0") + self.assertEqual(selected_entities[0].id, {"a": "a"}) + self.assertEqual(selected_entities[0].attributes, {"b": "b", "c": "c"}) + self.assertEqual(selected_entities[0].schema_url, "a") + + unselected_entities = [ + Entity("type_0", {"a": "a"}, {"b": "b"}, "a"), + Entity("type_0", {"a": "a"}, {"c": "c"}, "b"), + ] + + selected_entities = _select_entities(unselected_entities) + + self.assertEqual(len(selected_entities), 1) + self.assertEqual(selected_entities[0].type, "type_0") + self.assertEqual(selected_entities[0].id, {"a": "a"}) + self.assertEqual(selected_entities[0].attributes, {"b": "b"}) + self.assertEqual(selected_entities[0].schema_url, "a") + + unselected_entities = [ + Entity("type_0", {"a": "a"}, {"b": "b"}, "a"), + Entity("type_0", {"a": "b"}, {"c": "c"}, "a"), + ] + + selected_entities = _select_entities(unselected_entities) + + self.assertEqual(len(selected_entities), 1) + self.assertEqual(selected_entities[0].type, "type_0") + self.assertEqual(selected_entities[0].id, {"a": "a"}) + self.assertEqual(selected_entities[0].attributes, {"b": "b"}) + self.assertEqual(selected_entities[0].schema_url, "a") + + unselected_entities = [ + Entity("type_0", {"a": "a"}, {"b": "b"}, "a"), + Entity("type_1", {"a": "b"}, {"c": "c"}, "a"), + ] + + selected_entities = _select_entities(unselected_entities) + + self.assertEqual(len(selected_entities), 2) + self.assertEqual(selected_entities[0].type, "type_0") + self.assertEqual(selected_entities[0].id, {"a": "a"}) + self.assertEqual(selected_entities[0].attributes, {"b": "b"}) + self.assertEqual(selected_entities[0].schema_url, "a") + + self.assertEqual(selected_entities[1].type, "type_1") + self.assertEqual(selected_entities[1].id, {"a": "b"}) + self.assertEqual(selected_entities[1].attributes, {"c": "c"}) + self.assertEqual(selected_entities[1].schema_url, "a") + + def test_create_using_entities(self): + + resource = Resource.create_using_entities() + + self.assertEqual( + resource.attributes, + { + 'telemetry.sdk.language': 'python', + 'telemetry.sdk.name': 'opentelemetry', + 'telemetry.sdk.version': '1.28.0.dev0', + 'a': 'b', + 'c': 'd', + 'service.name': 'unknown_service' + } + ) + self.assertEqual( + resource.schema_url, + "" + )