Skip to content

Commit

Permalink
Add OTLP protocol class & protos (#821)
Browse files Browse the repository at this point in the history
* Add protos under packages for otlp

* Add common otlp proto payload methods

* Add new oltp protocol class

* Remove ML event from log message

* Remove params, add api-key header & expose path

The params are not relevant to OTLP so remove these.
The api-key header is how we provide the license key to OTLP so add this.
The path to upload dimensional metrics and events are different in OTLP so expose
the path so it can be overriden inside the coresponding data_collector methods.

* Add otlp_port and otlp_host settings

* Default to JSON if protobuf not available & warn

* Move otlp_utils to core

* Call encode in protocol class

* Patch issues with data collector

* Move resource to utils & add log proto imports

---------

Co-authored-by: Tim Pansino <timpansino@gmail.com>
  • Loading branch information
hmstepanek and TimPansino authored Jun 9, 2023
1 parent e970884 commit 30f0bf5
Show file tree
Hide file tree
Showing 17 changed files with 926 additions and 9 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ include newrelic/common/cacert.pem
include newrelic/packages/wrapt/LICENSE
include newrelic/packages/wrapt/README
include newrelic/packages/urllib3/LICENSE.txt
include newrelic/packages/opentelemetry_proto/LICENSE.txt
13 changes: 8 additions & 5 deletions newrelic/common/agent_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(
compression_method="gzip",
max_payload_size_in_bytes=1000000,
audit_log_fp=None,
default_content_encoding_header="Identity",
):
self._audit_log_fp = audit_log_fp

Expand Down Expand Up @@ -240,6 +241,7 @@ def __init__(
compression_method="gzip",
max_payload_size_in_bytes=1000000,
audit_log_fp=None,
default_content_encoding_header="Identity",
):
self._host = host
port = self._port = port
Expand All @@ -248,6 +250,7 @@ def __init__(
self._compression_method = compression_method
self._max_payload_size_in_bytes = max_payload_size_in_bytes
self._audit_log_fp = audit_log_fp
self._default_content_encoding_header = default_content_encoding_header

self._prefix = ""

Expand Down Expand Up @@ -419,11 +422,9 @@ def send_request(
method=self._compression_method,
level=self._compression_level,
)
content_encoding = self._compression_method
else:
content_encoding = "Identity"

merged_headers["Content-Encoding"] = content_encoding
merged_headers["Content-Encoding"] = self._compression_method
elif self._default_content_encoding_header:
merged_headers["Content-Encoding"] = self._default_content_encoding_header

request_id = self.log_request(
self._audit_log_fp,
Expand Down Expand Up @@ -489,6 +490,7 @@ def __init__(
compression_method="gzip",
max_payload_size_in_bytes=1000000,
audit_log_fp=None,
default_content_encoding_header="Identity",
):
proxy = self._parse_proxy(proxy_scheme, proxy_host, None, None, None)
if proxy and proxy.scheme == "https":
Expand All @@ -515,6 +517,7 @@ def __init__(
compression_method,
max_payload_size_in_bytes,
audit_log_fp,
default_content_encoding_header,
)


Expand Down
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ def _process_configuration(section):
_process_setting(section, "api_key", "get", None)
_process_setting(section, "host", "get", None)
_process_setting(section, "port", "getint", None)
_process_setting(section, "otlp_host", "get", None)
_process_setting(section, "otlp_port", "getint", None)
_process_setting(section, "ssl", "getboolean", None)
_process_setting(section, "proxy_scheme", "get", None)
_process_setting(section, "proxy_host", "get", None)
Expand Down
93 changes: 90 additions & 3 deletions newrelic/core/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
global_settings_dump,
)
from newrelic.core.internal_metrics import internal_count_metric
from newrelic.core.otlp_utils import OTLP_CONTENT_TYPE, otlp_encode
from newrelic.network.exceptions import (
DiscardDataForRequest,
ForceAgentDisconnect,
Expand Down Expand Up @@ -217,11 +218,16 @@ def __exit__(self, exc, value, tb):
def close_connection(self):
self.client.close_connection()

def send(self, method, payload=()):
def send(
self,
method,
payload=(),
path="/agent_listener/invoke_raw_method",
):
params, headers, payload = self._to_http(method, payload)

try:
response = self.client.send_request(params=params, headers=headers, payload=payload)
response = self.client.send_request(path=path, params=params, headers=headers, payload=payload)
except NetworkInterfaceException:
# All HTTP errors are currently retried
raise RetryDataForRequest
Expand Down Expand Up @@ -253,7 +259,10 @@ def send(self, method, payload=()):
exception = self.STATUS_CODE_RESPONSE.get(status, DiscardDataForRequest)
raise exception
if status == 200:
return json_decode(data.decode("utf-8"))["return_value"]
return self.decode_response(data)

def decode_response(self, response):
return json_decode(response.decode("utf-8"))["return_value"]

def _to_http(self, method, payload=()):
params = dict(self._params)
Expand Down Expand Up @@ -516,3 +525,81 @@ def connect(
# can be modified later
settings.aws_lambda_metadata = aws_lambda_metadata
return cls(settings, client_cls=client_cls)


class OtlpProtocol(AgentProtocol):
def __init__(self, settings, host=None, client_cls=ApplicationModeClient):
if settings.audit_log_file:
audit_log_fp = open(settings.audit_log_file, "a")
else:
audit_log_fp = None

self.client = client_cls(
host=host or settings.otlp_host,
port=settings.otlp_port or 4318,
proxy_scheme=settings.proxy_scheme,
proxy_host=settings.proxy_host,
proxy_port=settings.proxy_port,
proxy_user=settings.proxy_user,
proxy_pass=settings.proxy_pass,
timeout=settings.agent_limits.data_collector_timeout,
ca_bundle_path=settings.ca_bundle_path,
disable_certificate_validation=settings.debug.disable_certificate_validation,
compression_threshold=settings.agent_limits.data_compression_threshold,
compression_level=settings.agent_limits.data_compression_level,
compression_method=settings.compressed_content_encoding,
max_payload_size_in_bytes=1000000,
audit_log_fp=audit_log_fp,
default_content_encoding_header=None,
)

self._params = {}
self._headers = {
"api-key": settings.license_key,
}

# In Python 2, the JSON is loaded with unicode keys and values;
# however, the header name must be a non-unicode value when given to
# the HTTP library. This code converts the header name from unicode to
# non-unicode.
if settings.request_headers_map:
for k, v in settings.request_headers_map.items():
if not isinstance(k, str):
k = k.encode("utf-8")
self._headers[k] = v

# Content-Type should be protobuf, but falls back to JSON if protobuf is not installed.
self._headers["Content-Type"] = OTLP_CONTENT_TYPE
self._run_token = settings.agent_run_id

# Logging
self._proxy_host = settings.proxy_host
self._proxy_port = settings.proxy_port
self._proxy_user = settings.proxy_user

# Do not access configuration anywhere inside the class
self.configuration = settings

@classmethod
def connect(
cls,
app_name,
linked_applications,
environment,
settings,
client_cls=ApplicationModeClient,
):
with cls(settings, client_cls=client_cls) as protocol:
pass

return protocol

def _to_http(self, method, payload=()):
params = dict(self._params)
params["method"] = method
if self._run_token:
params["run_id"] = self._run_token
return params, self._headers, otlp_encode(payload)

def decode_response(self, response):
return response.decode("utf-8")
31 changes: 31 additions & 0 deletions newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def create_settings(nested):

class TopLevelSettings(Settings):
_host = None
_otlp_host = None

@property
def host(self):
Expand All @@ -115,6 +116,16 @@ def host(self):
def host(self, value):
self._host = value

@property
def otlp_host(self):
if self._otlp_host:
return self._otlp_host
return default_otlp_host(self.host)

@otlp_host.setter
def otlp_host(self, value):
self._otlp_host = value


class AttributesSettings(Settings):
pass
Expand Down Expand Up @@ -560,6 +571,24 @@ def default_host(license_key):
return host


def default_otlp_host(host):
HOST_MAP = {
"collector.newrelic.com": "otlp.nr-data.net",
"collector.eu.newrelic.com": "otlp.eu01.nr-data.net",
"gov-collector.newrelic.com": "gov-otlp.nr-data.net",
"staging-collector.newrelic.com": "staging-otlp.nr-data.net",
"staging-collector.eu.newrelic.com": "staging-otlp.eu01.nr-data.net",
"staging-gov-collector.newrelic.com": "staging-gov-otlp.nr-data.net",
"fake-collector.newrelic.com": "fake-otlp.nr-data.net",
}
otlp_host = HOST_MAP.get(host, None)
if not otlp_host:
default = HOST_MAP["collector.newrelic.com"]
_logger.warn("Unable to find corresponding OTLP host using default %s" % default)
otlp_host = default
return otlp_host


_LOG_LEVEL = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
Expand All @@ -585,7 +614,9 @@ def default_host(license_key):
_settings.ssl = _environ_as_bool("NEW_RELIC_SSL", True)

_settings.host = os.environ.get("NEW_RELIC_HOST")
_settings.otlp_host = os.environ.get("NEW_RELIC_OTLP_HOST")
_settings.port = int(os.environ.get("NEW_RELIC_PORT", "0"))
_settings.otlp_port = int(os.environ.get("NEW_RELIC_OTLP_PORT", "0"))

_settings.agent_run_id = None
_settings.entity_guid = None
Expand Down
10 changes: 9 additions & 1 deletion newrelic/core/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
DeveloperModeClient,
ServerlessModeClient,
)
from newrelic.core.agent_protocol import AgentProtocol, ServerlessModeProtocol
from newrelic.core.agent_protocol import (
AgentProtocol,
OtlpProtocol,
ServerlessModeProtocol,
)
from newrelic.core.agent_streaming import StreamingRpc
from newrelic.core.config import global_settings

Expand All @@ -36,12 +40,16 @@

class Session(object):
PROTOCOL = AgentProtocol
OTLP_PROTOCOL = OtlpProtocol
CLIENT = ApplicationModeClient

def __init__(self, app_name, linked_applications, environment, settings):
self._protocol = self.PROTOCOL.connect(
app_name, linked_applications, environment, settings, client_cls=self.CLIENT
)
self._otlp_protocol = self.OTLP_PROTOCOL.connect(
app_name, linked_applications, environment, settings, client_cls=self.CLIENT
)
self._rpc = None

@property
Expand Down
107 changes: 107 additions & 0 deletions newrelic/core/otlp_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module provides common utilities for interacting with OTLP protocol buffers."""

import logging

_logger = logging.getLogger(__name__)

try:
from newrelic.packages.opentelemetry_proto.common_pb2 import AnyValue, KeyValue
from newrelic.packages.opentelemetry_proto.logs_pb2 import (
LogRecord,
ResourceLogs,
ScopeLogs,
)
from newrelic.packages.opentelemetry_proto.metrics_pb2 import (
AggregationTemporality,
Metric,
MetricsData,
NumberDataPoint,
ResourceMetrics,
ScopeMetrics,
Sum,
Summary,
SummaryDataPoint,
)
from newrelic.packages.opentelemetry_proto.resource_pb2 import Resource

AGGREGATION_TEMPORALITY_DELTA = AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
ValueAtQuantile = SummaryDataPoint.ValueAtQuantile

otlp_encode = lambda payload: payload.SerializeToString()
OTLP_CONTENT_TYPE = "application/x-protobuf"

except ImportError:
from newrelic.common.encoding_utils import json_encode

def otlp_encode(*args, **kwargs):
_logger.warn(
"Using OTLP integration while protobuf is not installed. This may result in larger payload sizes and data loss."
)
return json_encode(*args, **kwargs)

Resource = dict
ValueAtQuantile = dict
AnyValue = dict
KeyValue = dict
NumberDataPoint = dict
SummaryDataPoint = dict
Sum = dict
Summary = dict
Metric = dict
MetricsData = dict
ScopeMetrics = dict
ResourceMetrics = dict
AGGREGATION_TEMPORALITY_DELTA = 1
ResourceLogs = dict
ScopeLogs = dict
LogRecord = dict
OTLP_CONTENT_TYPE = "application/json"


def create_key_value(key, value):
if isinstance(value, bool):
return KeyValue(key=key, value=AnyValue(bool_value=value))
elif isinstance(value, int):
return KeyValue(key=key, value=AnyValue(int_value=value))
elif isinstance(value, float):
return KeyValue(key=key, value=AnyValue(double_value=value))
elif isinstance(value, str):
return KeyValue(key=key, value=AnyValue(string_value=value))
# Technically AnyValue accepts array, kvlist, and bytes however, since
# those are not valid custom attribute types according to our api spec,
# we will not bother to support them here either.
else:
_logger.warn("Unsupported attribute value type %s: %s." % (key, value))


def create_key_values_from_iterable(iterable):
if isinstance(iterable, dict):
iterable = iterable.items()

# The create_key_value list may return None if the value is an unsupported type
# so filter None values out before returning.
return list(
filter(
lambda i: i is not None,
(create_key_value(key, value) for key, value in iterable),
)
)


def create_resource(attributes=None):
attributes = attributes or {"instrumentation.provider": "nr_performance_monitoring"}
return Resource(attributes=create_key_values_from_iterable(attributes))
Loading

0 comments on commit 30f0bf5

Please sign in to comment.