Skip to content

Commit

Permalink
feat: Add OpenTelemetry integration
Browse files Browse the repository at this point in the history
  • Loading branch information
sethmaxwl committed Jul 10, 2020
1 parent 04e261c commit d30d4f2
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 41 deletions.
27 changes: 26 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,39 @@ For example, to use JSON Web Tokens, provide a `google.auth.jwt.Credentials`_ in
# The same for the publisher, except that the "audience" claim needs to be adjusted
publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub = credentials.with_claims(audience=publisher_audience)
credentials_pub = credentials.with_claims(audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials_pub)
.. _Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.credentials.html#google.auth.credentials.Credentials
.. _google-auth: https://google-auth.readthedocs.io/en/latest/index.html
.. _google.auth.jwt.Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.jwt.html#google.auth.jwt.Credentials


OpenTelemetry Tracing
^^^^^^^^^^^^^^^^^^^^^

To enable OpenTelemetry tracing in Pub/Sub clients, the ``opentelemetry-api``, ``opentelemetry-sdk``,
and ``opentelemetry-instrumentation`` libraries must be installed. After installation, OpenTelemetry
can be used with any publisher or subscriber client by specifying an exporter for traces.

For example, for traces to be exported to Google Cloud Tracing, the Cloud Trace exporter must be specified.

.. code-block:: python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(CloudTraceSpanExporter())
)
For more information on OpenTelemetry, please consult the `OpenTelemetry documentation`_.

.. _OpenTelemetry documentation: https://opentelemetry-python.readthedocs.io

Versioning
----------

Expand Down
91 changes: 91 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2020, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from contextlib import contextmanager

from google.api_core.exceptions import GoogleAPICallError

_LOGGER = logging.getLogger(__name__)

try:
from opentelemetry import trace
from opentelemetry import propagators
from opentelemetry.trace import SpanContext
from opentelemetry.trace import get_current_span
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.status import Status
from opentelemetry.instrumentation.utils import http_status_to_canonical_code

USE_OPENTELEMETRY = True
except ImportError:
_LOGGER.info(
"This service supports OpenTelemetry, but OpenTelemetry could"
"not be imported. To use OpenTelemetry, please install the"
"opentelemetry-api, opentelemetry-sdk, and opentelemetry-instrumentation"
"pip modules. See also"
"https://opentelemetry-python.readthedocs.io/en/stable/getting-started.html"
)
USE_OPENTELEMETRY = False
pass


@contextmanager
def create_span(span_name, attributes=None, parent=None):
""" Creates a new span
Args:
span_name (str): the name of the new span
attributes ([dict], optional): A dictionary
containing all attributes to add to a span. Defaults to None.
parent ([dict], optional): A dictionary
containing the attributes of a . Defaults to None.
Yields:
[opentelemetry.trace.Span]: The newly created span, or None if
OpenTelemetry could not be imported
"""

# OpenTelemetry could not be imported.
if not USE_OPENTELEMETRY:
yield None
return

tracer = trace.get_tracer(__name__)

if parent is not None:
# Form the parent's context from the parent dict provided
try:
parent_span_context = SpanContext(
trace_id=parent["trace_id"],
span_id=parent["span_id"],
is_remote=parent["is_remote"],
trace_flags=parent["trace_flags"],
trace_state=parent["trace_state"],
)
except:
parent_span_context = None
else:
parent_span_context = None

# Create a new span and yield it
with tracer.start_as_current_span(
span_name, attributes=attributes, parent=parent_span_context
) as span:
try:
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_canonical_code(error.code)))
raise
62 changes: 37 additions & 25 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import pkg_resources
import threading
import time
import sys
import json

import grpc
import six
Expand All @@ -29,6 +31,7 @@

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.opentelemetry_tracing import create_span
from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
from google.cloud.pubsub_v1.publisher import exceptions
Expand Down Expand Up @@ -369,38 +372,47 @@ def publish(self, topic, data, ordering_key="", **attrs):
"be sent as text strings."
)

# Create the Pub/Sub message object.
message = types.PubsubMessage(
data=data, ordering_key=ordering_key, attributes=attrs
)
span_name = "{} publisher".format(topic)
span_attributes = {"data": data.decode()}
with create_span(span_name, attributes=span_attributes) as span:
if span is not None:
# Add the context of the span as an attribute
attrs["googclient_OpenTelemetrySpanContext"] = json.dumps(
span.get_context().__dict__
)

# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
try:
self._flow_controller.add(message)
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
return future
# Create the Pub/Sub message object.
message = types.PubsubMessage(
data=data, ordering_key=ordering_key, attributes=attrs
)

def on_publish_done(future):
self._flow_controller.release(message)
# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
try:
self._flow_controller.add(message)
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
return future

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
def on_publish_done(future):
self._flow_controller.release(message)

sequencer = self._get_or_create_sequencer(topic, ordering_key)
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")

# Delegate the publishing to the sequencer.
future = sequencer.publish(message)
future.add_done_callback(on_publish_done)
sequencer = self._get_or_create_sequencer(topic, ordering_key)

# Create a timer thread if necessary to enforce the batching
# timeout.
self._ensure_commit_timer_runs_no_lock()
# Delegate the publishing to the sequencer.
future = sequencer.publish(message)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
# timeout.
self._ensure_commit_timer_runs_no_lock()

return future
return future

def ensure_cleanup_and_commit_timer_runs(self):
""" Ensure a cleanup/commit timer thread is running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import logging
import threading
import uuid
import json

import grpc
import six

from google.api_core import bidi
from google.api_core import exceptions
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.opentelemetry_tracing import create_span
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
from google.cloud.pubsub_v1.subscriber._protocol import histogram
Expand Down Expand Up @@ -619,20 +621,40 @@ def _on_response(self, response):

with self._pause_resume_lock:
for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self.leaser.add([req])
if (
"googclient_OpenTelemetrySpanContext"
in received_message.message.attributes.keys()
):
publisher_span_context = json.loads(
received_message.message.attributes[
"googclient_OpenTelemetrySpanContext"
]
)
else:
publisher_span_context = None
span_attributes = {
"ack_id": received_message.ack_id,
"delivery_attempt": received_message.delivery_attempt,
}
with create_span(
"subscriber",
attributes=span_attributes,
parent=publisher_span_context,
):
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
ack_id=message.ack_id,
byte_size=message.size,
ordering_key=message.ordering_key,
)
self.leaser.add([req])

self._maybe_release_messages()

Expand Down
9 changes: 8 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ def lint_setup_py(session):

def default(session):
# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov")
session.install(
"mock",
"pytest",
"pytest-cov",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-instrumentation",
)
session.install("-e", ".")

# Run py.test against the unit tests.
Expand Down
Loading

0 comments on commit d30d4f2

Please sign in to comment.