Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry integration #149

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,39 @@ For example, to use JSON Web Tokens, provide a `google.auth.jwt.Credentials`_ in

# The same for the publisher, except that the "audience" claim needs to be adjusted
publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub = credentials.with_claims(audience=publisher_audience)
credentials_pub = credentials.with_claims(audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials_pub)

.. _Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.credentials.html#google.auth.credentials.Credentials
.. _google-auth: https://google-auth.readthedocs.io/en/latest/index.html
.. _google.auth.jwt.Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.jwt.html#google.auth.jwt.Credentials


OpenTelemetry Tracing
^^^^^^^^^^^^^^^^^^^^^

To enable OpenTelemetry tracing in Pub/Sub clients, the ``opentelemetry-api``, ``opentelemetry-sdk``,
and ``opentelemetry-instrumentation`` libraries must be installed. After installation, OpenTelemetry
can be used with any publisher or subscriber client by specifying an exporter for traces.

For example, for traces to be exported to Google Cloud Tracing, the Cloud Trace exporter must be specified.

.. code-block:: python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(CloudTraceSpanExporter())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be using a BatchSpanProcessor here. SimpleExportSpanProcessor exports all spans sequentially and will be very slow.

)

For more information on OpenTelemetry, please consult the `OpenTelemetry documentation`_.

.. _OpenTelemetry documentation: https://opentelemetry-python.readthedocs.io

Versioning
----------

Expand Down
91 changes: 91 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2020, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from contextlib import contextmanager

from google.api_core.exceptions import GoogleAPICallError

_LOGGER = logging.getLogger(__name__)

try:
from opentelemetry import trace
from opentelemetry import propagators
from opentelemetry.trace import SpanContext
from opentelemetry.trace import get_current_span
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.status import Status
from opentelemetry.instrumentation.utils import http_status_to_canonical_code

USE_OPENTELEMETRY = True
except ImportError:
_LOGGER.info(
"This service supports OpenTelemetry, but OpenTelemetry could"
"not be imported. To use OpenTelemetry, please install the"
"opentelemetry-api and opentelemetry-instrumentation"
"pip modules. See also"
"https://opentelemetry-python.readthedocs.io/en/stable/getting-started.html"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no spaces at the end of these lines, which will cause words to be displayed together (e.g. couldnot, rather than could not).

USE_OPENTELEMETRY = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion: In addition to the import try, wouldn't it be possible to use an environment variable for explicitly disabling opentelemetry? That way the tests below would not need to mess with sys.modules.



@contextmanager
def create_span(span_name, attributes=None, parent=None):
""" Create a new OpenTelemetry span

Args:
span_name (str): the name of the new span
attributes Optional[dict]: A dictionary
containing all attributes to add to a span. Defaults to None.
parent Optional[dict]: A dictionary
containing the attributes of a parent span's span
context. Defaults to None.

Yields:
[opentelemetry.trace.Span]: The newly created span, or None if
OpenTelemetry could not be imported
"""

# OpenTelemetry could not be imported.
if not USE_OPENTELEMETRY:
yield None
return

tracer = trace.get_tracer(__name__)

if parent is not None:
# Form the parent's context from the parent dict provided
try:
parent_span_context = SpanContext(**parent)
except TypeError:
_LOGGER.warning(
"A parent span was provided but it could not be"
"converted into a SpanContext. Ensure that the"
"parent is a mapping with at least a trace_id, span_id"
"and is_remote keys."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue from above, where spaces are missing at the end of the lines.

)
parent_span_context = None
else:
parent_span_context = None

# Create a new span and yield it
with tracer.start_as_current_span(
span_name, attributes=attributes, parent=parent_span_context
) as span:
try:
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_canonical_code(error.code)))
raise
69 changes: 44 additions & 25 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import pkg_resources
import threading
import time
import sys
import json

import grpc
import six
Expand All @@ -29,6 +31,7 @@

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.opentelemetry_tracing import create_span
from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
from google.cloud.pubsub_v1.publisher import exceptions
Expand Down Expand Up @@ -369,38 +372,54 @@ def publish(self, topic, data, ordering_key="", **attrs):
"be sent as text strings."
)

# Create the Pub/Sub message object.
message = types.PubsubMessage(
data=data, ordering_key=ordering_key, attributes=attrs
)
span_name = "{} publisher".format(topic)
span_attributes = {"data": data.decode()}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the argument for doing this? It seems to me that, best case, it results in the duplication of data in the message. But in the worst case it actually leaks sensitive information into observability stack, just like logging the output of a SQL query would.

with create_span(span_name, attributes=span_attributes) as span:
if span is not None:

# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
try:
self._flow_controller.add(message)
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
return future
if "googclient_OpenTelemetrySpanContext" in attrs:
_LOGGER.warning(
"googclient_OpenTelemetrySpanContext set on message"
"as an attribute, but will be overridden."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No space between "message" and "as".

)

def on_publish_done(future):
self._flow_controller.release(message)
# Add the context of the span as an attribute
attrs["googclient_OpenTelemetrySpanContext"] = json.dumps(
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
span.get_context().__dict__
)

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
# Create the Pub/Sub message object.
message = types.PubsubMessage(
data=data, ordering_key=ordering_key, attributes=attrs
)

sequencer = self._get_or_create_sequencer(topic, ordering_key)
# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
try:
self._flow_controller.add(message)
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
return future

# Delegate the publishing to the sequencer.
future = sequencer.publish(message)
future.add_done_callback(on_publish_done)
def on_publish_done(future):
self._flow_controller.release(message)

# Create a timer thread if necessary to enforce the batching
# timeout.
self._ensure_commit_timer_runs_no_lock()
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")

sequencer = self._get_or_create_sequencer(topic, ordering_key)

# Delegate the publishing to the sequencer.
future = sequencer.publish(message)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
# timeout.
self._ensure_commit_timer_runs_no_lock()

return future
return future

def ensure_cleanup_and_commit_timer_runs(self):
""" Ensure a cleanup/commit timer thread is running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import logging
import threading
import uuid
import json

import grpc
import six

from google.api_core import bidi
from google.api_core import exceptions
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.opentelemetry_tracing import create_span
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
from google.cloud.pubsub_v1.subscriber._protocol import histogram
Expand Down Expand Up @@ -619,20 +621,40 @@ def _on_response(self, response):

with self._pause_resume_lock:
for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self.leaser.add([req])
if (
"googclient_OpenTelemetrySpanContext"
in received_message.message.attributes
):
publisher_span_context = json.loads(
received_message.message.attributes[
"googclient_OpenTelemetrySpanContext"
]
)
else:
publisher_span_context = None
span_attributes = {
"ack_id": received_message.ack_id,
"delivery_attempt": received_message.delivery_attempt,
}
with create_span(
self._subscription,
attributes=span_attributes,
parent=publisher_span_context,
):
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self.leaser.add([req])

self._maybe_release_messages()

Expand Down
9 changes: 8 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ def lint_setup_py(session):

def default(session):
# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov")
session.install(
"mock",
"pytest",
"pytest-cov",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-instrumentation",
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
)
session.install("-e", ".")

# Run py.test against the unit tests.
Expand Down
13 changes: 13 additions & 0 deletions synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,17 @@ def _merge_dict(d1, d2):
# ----------------------------------------------------------------------------
python.py_samples()

# ----------------------------------------------------------------------------
# Additional unit test dependincies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: dependencies.

# ----------------------------------------------------------------------------
s.replace(
"noxfile.py",
r'session\.install\("mock", "pytest", "pytest-cov"\)',
"""\g<0>
session.install(
"mock", "pytest", "pytest-cov",
"opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation",
)""",
)

s.shell.run(["nox", "-s", "blacken"], hide_output=False)
Loading