Skip to content

Commit

Permalink
Add entities prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Sep 24, 2024
1 parent f15821f commit dba0e24
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 15 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ otel = "opentelemetry.sdk.resources:OTELResourceDetector"
process = "opentelemetry.sdk.resources:ProcessResourceDetector"
os = "opentelemetry.sdk.resources:OsResourceDetector"

[project.entry-points.opentelemetry_entity_detector]
type0= "opentelemetry.sdk.resources:Type0EntityDetector"
type1= "opentelemetry.sdk.resources:Type1EntityDetector"

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python/tree/main/opentelemetry-sdk"

Expand Down
271 changes: 259 additions & 12 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@
above example.
"""

import abc
import concurrent.futures
import logging
import os
import platform
import sys
import typing
from abc import ABC, abstractmethod
from json import dumps
from os import environ
from types import ModuleType
from typing import List, MutableMapping, Optional, cast
from typing import List, MutableMapping, Optional, cast, Mapping
from urllib import parse
from warnings import warn

from opentelemetry.attributes import BoundedAttributes
from opentelemetry.sdk.environment_variables import (
Expand All @@ -88,7 +88,7 @@
pass

LabelValue = AttributeValue
Attributes = typing.Mapping[str, LabelValue]
Attributes = Mapping[str, LabelValue]
logger = logging.getLogger(__name__)

CLOUD_PROVIDER = ResourceAttributes.CLOUD_PROVIDER
Expand Down Expand Up @@ -152,24 +152,90 @@
_OPENTELEMETRY_SDK_VERSION: str = version("opentelemetry-sdk")


class Entity:

def __init__(
self,
type_: str,
id_: Mapping[str, str],
attributes: Optional[Attributes] = None,
schema_url: Optional[str] = None,
):

if not type_:
raise Exception("Entity type must not be empty")

if attributes is None:
attributes = {}

self._type = type_

# These are attributes that identify the entity and must not change
# during the lifetime of the entity. id_ must contain at least one
# attribute.
if not id_:
raise Exception("Entity id must not be empty")

self._id = id_

# These are attributes that do not identify the entity and may change
# during the lifetime of the entity.
self._attributes = attributes

if schema_url is None:
schema_url = ""

self._schema_url = schema_url

@property
def type(self):
return self._type

@property
def id(self):
# FIXME we need a checker here that makes sure that the id attributes
# are compliant with the spec. Not implementing it here since this
# seems like a thing that should be available to other components as
# well.
return self._id

@property
def attributes(self):
return self._attributes

@property
def schema_url(self):
return self._schema_url


class Resource:
"""A Resource is an immutable representation of the entity producing telemetry as Attributes."""

_attributes: BoundedAttributes
_schema_url: str

def __init__(
self, attributes: Attributes, schema_url: typing.Optional[str] = None
self,
attributes: Attributes,
schema_url: Optional[str] = None,
*entities,
):
self._attributes = BoundedAttributes(attributes=attributes)
if schema_url is None:
schema_url = ""
else:
warn("Resource.schema_url is deprecated", DeprecationWarning)

self._schema_url = schema_url

# FIXME the spec draft says the Resource will have an EntityRef proto
# figure out what that is supposed to be.
self._entities = entities

@staticmethod
def create(
attributes: typing.Optional[Attributes] = None,
schema_url: typing.Optional[str] = None,
attributes: Optional[Attributes] = None,
schema_url: Optional[str] = None,
) -> "Resource":
"""Creates a new `Resource` from attributes.
Expand All @@ -181,7 +247,7 @@ def create(
The newly-created Resource.
"""

if not attributes:
if attributes is None:
attributes = {}

otel_experimental_resource_detectors = {"otel"}.union(
Expand Down Expand Up @@ -225,6 +291,51 @@ def create(
)
return resource

@staticmethod
def create_using_entities(
attributes: Optional[Attributes] = None,
schema_url: Optional[str] = None,
) -> "Resource":
# This method is added here with the intention of not disturbing the
# previous API create method for backwards compatibility reasons.

if attributes is None:
attributes = {}

if schema_url is None:
schema_url = ""

selected_entities = _select_entities(
[
entity_detector.detect()
for entity_detector in _get_entity_detectors()
]
)

resource_attributes = {}

for selected_entity in selected_entities:
for key, value in selected_entity._id.items():
resource_attributes[key] = value

for key, value in selected_entity._attributes.items():
resource_attributes[key] = value

entity_schema_url = selected_entities[0].schema_url

for selected_entity in selected_entities:
if selected_entity.schema_url != entity_schema_url:
entity_schema_url = None
break

resource_attributes.update(attributes)

resource = Resource.create(
attributes=resource_attributes, schema_url=entity_schema_url
)

return resource

@staticmethod
def get_empty() -> "Resource":
return _EMPTY_RESOURCE
Expand Down Expand Up @@ -298,6 +409,72 @@ def to_json(self, indent: int = 4) -> str:
)


def _get_entity_detectors():

entity_detectors: List[ResourceDetector] = []

for entity_detector in entry_points(
group="opentelemetry_entity_detector",
):

entity_detectors.append(entity_detector.load()())

# This checker is added here but it could live in the configuration
# mechanism, so that it detects a possible error when 2 entity
# detectors have the same priority even earlier.
if len(entity_detectors) > 1:

sorted_entity_detectors = sorted(
entity_detectors, key=lambda x: x.priority
)

priorities = set()

for entity_detector in sorted_entity_detectors:

if entity_detector.priority in priorities:
raise ValueError(
f"Duplicate priority {entity_detector.priority}"
f"for entity detector of type {type(entity_detector)}"
)

priorities.add(entity_detector.priority)

return entity_detectors


def _select_entities(unselected_entities):

selected_entities = [unselected_entities.pop(0)]

for unselected_entity in unselected_entities:

for selected_entity in selected_entities:

if selected_entity.type == unselected_entity.type:
if (
selected_entity.id == unselected_entity.id
and selected_entity.schema_url
== (unselected_entity.schema_url)
):
for key, value in unselected_entity.attributes.items():
if key not in selected_entity.attributes.keys():
selected_entity._attributes[key] = value
break
elif (
selected_entity.id == unselected_entity.id
and selected_entity.schema_url
!= (unselected_entity.schema_url)
):
break
elif selected_entity.id != unselected_entity.id:
break
else:
selected_entities.append(unselected_entity)

return selected_entities


_EMPTY_RESOURCE = Resource({})
_DEFAULT_RESOURCE = Resource(
{
Expand All @@ -308,19 +485,89 @@ def to_json(self, indent: int = 4) -> str:
)


class ResourceDetector(abc.ABC):
class Detector(ABC):
def __init__(self, raise_on_error: bool = False) -> None:
self.raise_on_error = raise_on_error

@abc.abstractmethod
@abstractmethod
def detect(self):
raise NotImplementedError()


class ResourceDetector(Detector):
@abstractmethod
def detect(self) -> "Resource":
raise NotImplementedError()


class EntityDetector(Detector):
def __init__(self, raise_on_error: bool = False) -> None:
self.raise_on_error = raise_on_error

@abstractmethod
def detect(self) -> "Entity":
raise NotImplementedError()

@property
@abstractmethod
def priority(self):
raise NotImplementedError()


class Type0EntityDetector(EntityDetector):

_entity = None

def detect(self) -> Entity:

if self._entity is None:
# The OTEP says an entity detector must not provide two entities of
# the same type. It seems to me that this means it will only
# provide one entity at all since the entity detector is associated
# to a particular "type" (process, OS, etc)
self._entity = Entity(
"type0", id_={"a": "b"}, attributes={"c": "d"}
)

return self._entity

@property
def priority(self):
# This probably needs a configuration mechanism so that it can get its
# priority from some configuration file or something else.
return 0


class Type1EntityDetector(EntityDetector):

_entity = None

def detect(self) -> Entity:

if self._entity is None:
# The OTEP says an entity detector must not provide two entities of
# the same type. It seems to me that this means it will only
# provide one entity at all since the entity detector is associated
# to a particular "type" (process, OS, etc)

self._entity = Entity(
"type1", id_={"a": "b"}, attributes={"c": "d"}
)

return self._entity

@property
def priority(self):
# This probably needs a configuration mechanism so that it can get its
# priority from some configuration file or something else.
return 1


class OTELResourceDetector(ResourceDetector):
# pylint: disable=no-self-use
def detect(self) -> "Resource":

# An example of OTEL_RESOURCE_ATTRIBUTES is "key0=value0,key1=value1"
env_resources_items = environ.get(OTEL_RESOURCE_ATTRIBUTES)
env_resource_map = {}

Expand Down Expand Up @@ -472,8 +719,8 @@ def detect(self) -> "Resource":


def get_aggregated_resources(
detectors: typing.List["ResourceDetector"],
initial_resource: typing.Optional[Resource] = None,
detectors: List["ResourceDetector"],
initial_resource: Optional[Resource] = None,
timeout: int = 5,
) -> "Resource":
"""Retrieves resources from detectors in the order that they were passed
Expand Down
16 changes: 13 additions & 3 deletions opentelemetry-sdk/tests/logs/test_log_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,21 @@ def test_log_record_dropped_attributes_set_limits_warning_once(self):
attributes=attr,
limits=limits,
)
self.assertEqual(len(cw), 1)
self.assertIsInstance(cw[-1].message, LogDroppedAttributesWarning)

non_deprecation_warnings = []

for warning in cw:
if warning.category is DeprecationWarning:
continue
non_deprecation_warnings.append(warning)

self.assertEqual(len(non_deprecation_warnings), 1)
self.assertIsInstance(
non_deprecation_warnings[0].message, LogDroppedAttributesWarning
)
self.assertIn(
"Log record attributes were dropped due to limits",
str(cw[-1].message),
str(non_deprecation_warnings[0].message),
)

def test_log_record_dropped_attributes_unset_limits(self):
Expand Down
Loading

0 comments on commit dba0e24

Please sign in to comment.