diff --git a/README.rst b/README.rst index a92a43087..519da7ea7 100644 --- a/README.rst +++ b/README.rst @@ -186,7 +186,7 @@ For example, to use JSON Web Tokens, provide a `google.auth.jwt.Credentials`_ in # The same for the publisher, except that the "audience" claim needs to be adjusted publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher" - credentials_pub = credentials.with_claims(audience=publisher_audience) + credentials_pub = credentials.with_claims(audience=publisher_audience) publisher = pubsub_v1.PublisherClient(credentials=credentials_pub) .. _Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.credentials.html#google.auth.credentials.Credentials @@ -194,6 +194,31 @@ For example, to use JSON Web Tokens, provide a `google.auth.jwt.Credentials`_ in .. _google.auth.jwt.Credentials: https://google-auth.readthedocs.io/en/latest/reference/google.auth.jwt.html#google.auth.jwt.Credentials +OpenTelemetry Tracing +^^^^^^^^^^^^^^^^^^^^^ + +To enable OpenTelemetry tracing in Pub/Sub clients, the ``opentelemetry-api``, ``opentelemetry-sdk``, +and ``opentelemetry-instrumentation`` libraries must be installed. After installation, OpenTelemetry +can be used with any publisher or subscriber client by specifying an exporter for traces. + +For example, for traces to be exported to Google Cloud Tracing, the Cloud Trace exporter must be specified. + +.. code-block:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(CloudTraceSpanExporter()) + ) + +For more information on OpenTelemetry, please consult the `OpenTelemetry documentation`_. + +.. _OpenTelemetry documentation: https://opentelemetry-python.readthedocs.io + Versioning ---------- diff --git a/google/cloud/pubsub_v1/opentelemetry_tracing.py b/google/cloud/pubsub_v1/opentelemetry_tracing.py new file mode 100644 index 000000000..b9ec98817 --- /dev/null +++ b/google/cloud/pubsub_v1/opentelemetry_tracing.py @@ -0,0 +1,91 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from contextlib import contextmanager + +from google.api_core.exceptions import GoogleAPICallError + +_LOGGER = logging.getLogger(__name__) + +try: + from opentelemetry import trace + from opentelemetry import propagators + from opentelemetry.trace import SpanContext + from opentelemetry.trace import get_current_span + from opentelemetry.trace import set_span_in_context + from opentelemetry.trace.status import Status + from opentelemetry.instrumentation.utils import http_status_to_canonical_code + + USE_OPENTELEMETRY = True +except ImportError: + _LOGGER.info( + "This service supports OpenTelemetry, but OpenTelemetry could" + "not be imported. To use OpenTelemetry, please install the" + "opentelemetry-api, opentelemetry-sdk, and opentelemetry-instrumentation" + "pip modules. See also" + "https://opentelemetry-python.readthedocs.io/en/stable/getting-started.html" + ) + USE_OPENTELEMETRY = False + pass + + +@contextmanager +def create_span(span_name, attributes=None, parent=None): + """ Creates a new span + + Args: + span_name (str): the name of the new span + attributes ([dict], optional): A dictionary + containing all attributes to add to a span. Defaults to None. + parent ([dict], optional): A dictionary + containing the attributes of a . Defaults to None. + + Yields: + [opentelemetry.trace.Span]: The newly created span, or None if + OpenTelemetry could not be imported + """ + + # OpenTelemetry could not be imported. + if not USE_OPENTELEMETRY: + yield None + return + + tracer = trace.get_tracer(__name__) + + if parent is not None: + # Form the parent's context from the parent dict provided + try: + parent_span_context = SpanContext( + trace_id=parent["trace_id"], + span_id=parent["span_id"], + is_remote=parent["is_remote"], + trace_flags=parent["trace_flags"], + trace_state=parent["trace_state"], + ) + except: + parent_span_context = None + else: + parent_span_context = None + + # Create a new span and yield it + with tracer.start_as_current_span( + span_name, attributes=attributes, parent=parent_span_context + ) as span: + try: + yield span + except GoogleAPICallError as error: + if error.code is not None: + span.set_status(Status(http_status_to_canonical_code(error.code))) + raise diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 8dbbea634..ca7103c10 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -20,6 +20,8 @@ import pkg_resources import threading import time +import sys +import json import grpc import six @@ -29,6 +31,7 @@ from google.cloud.pubsub_v1 import _gapic from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.opentelemetry_tracing import create_span from google.cloud.pubsub_v1.gapic import publisher_client from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport from google.cloud.pubsub_v1.publisher import exceptions @@ -369,38 +372,47 @@ def publish(self, topic, data, ordering_key="", **attrs): "be sent as text strings." ) - # Create the Pub/Sub message object. - message = types.PubsubMessage( - data=data, ordering_key=ordering_key, attributes=attrs - ) + span_name = "{} publisher".format(topic) + span_attributes = {"data": data.decode()} + with create_span(span_name, attributes=span_attributes) as span: + if span is not None: + # Add the context of the span as an attribute + attrs["googclient_OpenTelemetrySpanContext"] = json.dumps( + span.get_context().__dict__ + ) - # Messages should go through flow control to prevent excessive - # queuing on the client side (depending on the settings). - try: - self._flow_controller.add(message) - except exceptions.FlowControlLimitError as exc: - future = futures.Future() - future.set_exception(exc) - return future + # Create the Pub/Sub message object. + message = types.PubsubMessage( + data=data, ordering_key=ordering_key, attributes=attrs + ) - def on_publish_done(future): - self._flow_controller.release(message) + # Messages should go through flow control to prevent excessive + # queuing on the client side (depending on the settings). + try: + self._flow_controller.add(message) + except exceptions.FlowControlLimitError as exc: + future = futures.Future() + future.set_exception(exc) + return future - with self._batch_lock: - if self._is_stopped: - raise RuntimeError("Cannot publish on a stopped publisher.") + def on_publish_done(future): + self._flow_controller.release(message) - sequencer = self._get_or_create_sequencer(topic, ordering_key) + with self._batch_lock: + if self._is_stopped: + raise RuntimeError("Cannot publish on a stopped publisher.") - # Delegate the publishing to the sequencer. - future = sequencer.publish(message) - future.add_done_callback(on_publish_done) + sequencer = self._get_or_create_sequencer(topic, ordering_key) - # Create a timer thread if necessary to enforce the batching - # timeout. - self._ensure_commit_timer_runs_no_lock() + # Delegate the publishing to the sequencer. + future = sequencer.publish(message) + future.add_done_callback(on_publish_done) + + # Create a timer thread if necessary to enforce the batching + # timeout. + self._ensure_commit_timer_runs_no_lock() - return future + return future def ensure_cleanup_and_commit_timer_runs(self): """ Ensure a cleanup/commit timer thread is running. diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4e3f24933..5c07bf0b8 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -19,6 +19,7 @@ import logging import threading import uuid +import json import grpc import six @@ -26,6 +27,7 @@ from google.api_core import bidi from google.api_core import exceptions from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.opentelemetry_tracing import create_span from google.cloud.pubsub_v1.subscriber._protocol import dispatcher from google.cloud.pubsub_v1.subscriber._protocol import heartbeater from google.cloud.pubsub_v1.subscriber._protocol import histogram @@ -619,20 +621,40 @@ def _on_response(self, response): with self._pause_resume_lock: for received_message in response.received_messages: - message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, - received_message.ack_id, - received_message.delivery_attempt, - self._scheduler.queue, - ) - self._messages_on_hold.put(message) - self._on_hold_bytes += message.size - req = requests.LeaseRequest( - ack_id=message.ack_id, - byte_size=message.size, - ordering_key=message.ordering_key, - ) - self.leaser.add([req]) + if ( + "googclient_OpenTelemetrySpanContext" + in received_message.message.attributes.keys() + ): + publisher_span_context = json.loads( + received_message.message.attributes[ + "googclient_OpenTelemetrySpanContext" + ] + ) + else: + publisher_span_context = None + span_attributes = { + "ack_id": received_message.ack_id, + "delivery_attempt": received_message.delivery_attempt, + } + with create_span( + "subscriber", + attributes=span_attributes, + parent=publisher_span_context, + ): + message = google.cloud.pubsub_v1.subscriber.message.Message( + received_message.message, + received_message.ack_id, + received_message.delivery_attempt, + self._scheduler.queue, + ) + self._messages_on_hold.put(message) + self._on_hold_bytes += message.size + req = requests.LeaseRequest( + ack_id=message.ack_id, + byte_size=message.size, + ordering_key=message.ordering_key, + ) + self.leaser.add([req]) self._maybe_release_messages() diff --git a/noxfile.py b/noxfile.py index 615358c2e..3af3d6772 100644 --- a/noxfile.py +++ b/noxfile.py @@ -70,7 +70,14 @@ def lint_setup_py(session): def default(session): # Install all test dependencies, then install this package in-place. - session.install("mock", "pytest", "pytest-cov") + session.install( + "mock", + "pytest", + "pytest-cov", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-instrumentation", + ) session.install("-e", ".") # Run py.test against the unit tests. diff --git a/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry.py b/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry.py new file mode 100644 index 000000000..82cb70a8b --- /dev/null +++ b/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry.py @@ -0,0 +1,97 @@ +import mock +import json +import sys +import types as stdlib_types +import unittest +from importlib import reload + +from google.auth import credentials +from google.api_core import bidi +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1 import publisher +from google.cloud.pubsub_v1 import subscriber +from google.cloud.pubsub_v1 import opentelemetry_tracing + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +TEST_SPAN_NAME = "foo" + +TEST_SPAN_ATTRIBUTES = {"foo": "bar"} + +TEST_PARENT_SPAN_CONTEXT = { + "trace_id": 253178191101418583724305005272987492287, + "span_id": 17783848543062801337, + "is_remote": False, + "trace_flags": 1, + "trace_state": {}, +} + + +def _create_publisher(): + creds = mock.Mock(spec=credentials) + return publisher.Client(credentials=creds) + + +def _create_subscriber(): + creds = mock.Mock(spec=credentials) + return subscriber.Client(credentials=creds) + + +def _simulate_pubsub_session(): + test_publisher = _create_publisher() + test_subscriber = _create_subscriber() + test_subscriber.subscribe("test_topic", callback=mock.sentinel.callback) + test_publisher.publish("test_topic", b"foobar") + + +def _convert_context_to_dict(context): + return { + "trace_id": context.trace_id, + "span_id": context.span_id, + "is_remote": context.is_remote, + "trace_flags": context.trace_flags, + "trace_state": context.trace_state, + } + + +class TestOpenTelemetry(unittest.TestCase): + def setUp(self): + self.original_tracer_provider = trace.get_tracer_provider() + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace.set_tracer_provider(self.tracer_provider) + + def tearDown(self): + trace.set_tracer_provider(self.original_tracer_provider) + + def test_span_creation(self): + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, + attributes=TEST_SPAN_ATTRIBUTES, + parent=TEST_PARENT_SPAN_CONTEXT, + ) as span: + if span is None: + span_list = self.memory_exporter.get_finished_spans() + assert len(span_list) == 1 + span = span_list[0] + assert span.name == TEST_SPAN_NAME + assert span.attributes == TEST_SPAN_ATTRIBUTES + + def test_parent_span_linking(self): + with opentelemetry_tracing.create_span("Parent") as parent_span: + parent_span_context = _convert_context_to_dict(parent_span.get_context()) + with opentelemetry_tracing.create_span( + "Child", parent=parent_span_context + ) as child_span: + child_span_context = child_span.get_context() + assert parent_span_context["trace_id"] == child_span_context.trace_id + + def test_trace_call(self): + _simulate_pubsub_session() + span_list = self.memory_exporter.get_finished_spans() + assert len(span_list) == 1 diff --git a/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry_uninstalled.py b/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry_uninstalled.py new file mode 100644 index 000000000..55b40ecef --- /dev/null +++ b/tests/unit/pubsub_v1/opentelemetry/test_opentelemetry_uninstalled.py @@ -0,0 +1,20 @@ +import sys +from importlib import reload +import unittest + +from google.cloud.pubsub_v1 import opentelemetry_tracing + + +class TestOpenTelemetryNotInstalled(unittest.TestCase): + def setUp(self): + self._tmp_opentelemetry = sys.modules["opentelemetry"] + sys.modules["opentelemetry"] = None + reload(opentelemetry_tracing) + + def tearDown(self): + sys.modules["opentelemetry"] = self._tmp_opentelemetry + reload(opentelemetry_tracing) + + def test_opentelemetry_not_installed(self): + with opentelemetry_tracing.create_span("Test Not Installed") as span: + assert span is None diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index b58ed133f..df791b0de 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -20,6 +20,7 @@ import mock import pytest import time +import sys from google.cloud.pubsub_v1.gapic import publisher_client from google.cloud.pubsub_v1 import publisher @@ -28,6 +29,11 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer +import opentelemetry + +# Disable OpenTelemetry for publisher tests +sys.modules["opentelemetry"] = None + def test_init(): creds = mock.Mock(spec=credentials.Credentials)