From 170f927f4102b879a810f1238d3fee8e00d5b4e7 Mon Sep 17 00:00:00 2001 From: Vasilii Novikov Date: Wed, 18 Dec 2019 01:01:35 +0300 Subject: [PATCH] Add AsyncioScopeManager based on contextvars and supporting Tornado 6 (#118) * Asyncio context manager with contextvars. Add different versions of tornado to travis.yml. * Make new context manager based on python 3.7 contextvars. * Inherit ContextVarsScopeManagerFix directly from ScopeManager, fix docstrings and README * Update testbed/test_multiple_callbacks/README.md Co-Authored-By: Yuri Shkuro * Update testbed/test_nested_callbacks/README.md Co-Authored-By: Yuri Shkuro * Update testbed/test_subtask_span_propagation/README.md Co-Authored-By: Yuri Shkuro * Fix typo in testbed docs * Remove obsolete description from testbed docs * Update testbed/test_common_request_handler/README.md Co-Authored-By: Yuri Shkuro --- .travis.yml | 9 + opentracing/scope_managers/contextvars.py | 131 +++++++ setup.py | 6 +- testbed/README.md | 10 +- testbed/__init__.py | 1 - testbed/__main__.py | 14 +- .../test_contextvars.py | 54 +++ .../test_client_server/test_contextvars.py | 79 ++++ testbed/test_common_request_handler/README.md | 4 +- .../test_asyncio.py | 2 - .../test_contextvars.py | 123 +++++++ testbed/test_late_span_finish/test_asyncio.py | 2 +- .../test_late_span_finish/test_contextvars.py | 47 +++ .../test_contextvars.py | 46 +++ testbed/test_multiple_callbacks/README.md | 3 +- .../test_multiple_callbacks/test_asyncio.py | 2 +- .../test_contextvars.py | 56 +++ testbed/test_nested_callbacks/README.md | 4 +- .../test_nested_callbacks/test_contextvars.py | 344 ++++++++++++++++++ .../test_subtask_span_propagation/README.md | 2 +- .../test_asyncio.py | 2 - .../test_contextvars.py | 32 ++ testbed/testcase.py | 8 + tests/conftest.py | 13 +- tests/scope_managers/test_contextvars.py | 52 +++ tests/scope_managers/test_tornado.py | 16 +- 26 files changed, 1033 insertions(+), 29 deletions(-) create mode 100644 opentracing/scope_managers/contextvars.py create mode 100644 testbed/test_active_span_replacement/test_contextvars.py create mode 100644 testbed/test_client_server/test_contextvars.py create mode 100644 testbed/test_common_request_handler/test_contextvars.py create mode 100644 testbed/test_late_span_finish/test_contextvars.py create mode 100644 testbed/test_listener_per_request/test_contextvars.py create mode 100644 testbed/test_multiple_callbacks/test_contextvars.py create mode 100644 testbed/test_nested_callbacks/test_contextvars.py create mode 100644 testbed/test_subtask_span_propagation/test_contextvars.py create mode 100644 tests/scope_managers/test_contextvars.py diff --git a/.travis.yml b/.travis.yml index 8e7c12a..7a2bc89 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,12 +8,21 @@ python: - "3.7" - "3.8-dev" +env: + - TORNADO=">=4,<5" + - TORNADO=">=5,<6" + - TORNADO=">=6" + matrix: allow_failures: - python: "3.8-dev" + exclude: + - python: "2.7" + env: TORNADO=">=6" install: - make bootstrap + - pip install -q "tornado$TORNADO" script: - make test testbed lint diff --git a/opentracing/scope_managers/contextvars.py b/opentracing/scope_managers/contextvars.py new file mode 100644 index 0000000..3161c39 --- /dev/null +++ b/opentracing/scope_managers/contextvars.py @@ -0,0 +1,131 @@ +# Copyright (c) The OpenTracing Authors. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from __future__ import absolute_import + +from contextlib import contextmanager +from contextvars import ContextVar + +from opentracing import Scope, ScopeManager + + +_SCOPE = ContextVar('scope') + + +class ContextVarsScopeManager(ScopeManager): + """ + :class:`~opentracing.ScopeManager` implementation for **asyncio** + that stores the :class:`~opentracing.Scope` using ContextVar. + + The scope manager provides automatic :class:`~opentracing.Span` propagation + from parent coroutines, tasks and scheduled in event loop callbacks to + their children. + + .. code-block:: python + + async def child_coroutine(): + # No need manual activation of parent span in child coroutine. + with tracer.start_active_span('child') as scope: + ... + + async def parent_coroutine(): + with tracer.start_active_span('parent') as scope: + ... + await child_coroutine() + ... + + """ + + def activate(self, span, finish_on_close): + """ + Make a :class:`~opentracing.Span` instance active. + + :param span: the :class:`~opentracing.Span` that should become active. + :param finish_on_close: whether *span* should automatically be + finished when :meth:`Scope.close()` is called. + + :return: a :class:`~opentracing.Scope` instance to control the end + of the active period for the :class:`~opentracing.Span`. + It is a programming error to neglect to call :meth:`Scope.close()` + on the returned instance. + """ + + return self._set_scope(span, finish_on_close) + + @property + def active(self): + """ + Return the currently active :class:`~opentracing.Scope` which + can be used to access the currently active :attr:`Scope.span`. + + :return: the :class:`~opentracing.Scope` that is active, + or ``None`` if not available. + """ + + return self._get_scope() + + def _set_scope(self, span, finish_on_close): + return _ContextVarsScope(self, span, finish_on_close) + + def _get_scope(self): + return _SCOPE.get(None) + + +class _ContextVarsScope(Scope): + def __init__(self, manager, span, finish_on_close): + super(_ContextVarsScope, self).__init__(manager, span) + self._finish_on_close = finish_on_close + self._token = _SCOPE.set(self) + + def close(self): + if self.manager.active is not self: + return + + _SCOPE.reset(self._token) + + if self._finish_on_close: + self.span.finish() + + +@contextmanager +def no_parent_scope(): + """ + Context manager that resets current Scope. Intended to break span + propagation to children coroutines, tasks or scheduled callbacks. + + .. code-block:: python + + from opentracing.scope_managers.contextvars import no_parent_scope + + def periodic() + # `periodic` span will be children of root only at the first time. + with self.tracer.start_active_span('periodic'): + # Now we break span propagation. + with no_parent_scope(): + self.loop.call_soon(periodic) + + with self.tracer.start_active_span('root'): + self.loop.call_soon(periodic) + """ + token = _SCOPE.set(None) + try: + yield + finally: + _SCOPE.reset(token) diff --git a/setup.py b/setup.py index 47041c7..6f2ea46 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,9 @@ include_package_data=True, zip_safe=False, platforms='any', + install_requires=[ + 'futures;python_version=="2.7"', + ], extras_require={ 'tests': [ 'doubles', @@ -40,8 +43,7 @@ 'six>=1.10.0,<2.0', 'gevent', - 'tornado<6', + 'tornado', ], - ':python_version == "2.7"': ['futures'], }, ) diff --git a/testbed/README.md b/testbed/README.md index b4043c9..a3e711f 100644 --- a/testbed/README.md +++ b/testbed/README.md @@ -18,7 +18,7 @@ Alternatively, due to the organization of the suite, it's possible to run direct ## Tested frameworks -Currently the examples cover `threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations. +Currently the examples cover `threading`, `tornado`, `gevent`, `asyncio` (which requires Python 3) and `contextvars` (which requires Python 3.7 and higher). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations. ### threading, asyncio and gevent @@ -30,6 +30,10 @@ No automatic `Span` propagation between parent and children tasks is provided, a Currently, yielding over multiple children is not supported, as the context is effectively shared, and switching from coroutine to coroutine messes up the current active `Span`. +### contextvars + +`ContextVarsScopeManager` uses [contextvars](https://docs.python.org/3/library/contextvars.html) module to both store **and** automatically propagate the context from parent coroutines / tasks / scheduled in event loop callbacks to their children. + ## List of patterns - [Active Span replacement](test_active_span_replacement) - Start an isolated task and query for its results in another task/thread. @@ -54,7 +58,3 @@ testbed/ ``` Supporting all the platforms is optional, and a warning will be displayed when doing `make testbed` in such case. - -## Flake8 support - -Currently `flake8` does not support the Python 3 `await`/`async` syntax, and does not offer a way to ignore such syntax. diff --git a/testbed/__init__.py b/testbed/__init__.py index 8b13789..e69de29 100644 --- a/testbed/__init__.py +++ b/testbed/__init__.py @@ -1 +0,0 @@ - diff --git a/testbed/__main__.py b/testbed/__main__.py index 1dd2ceb..ded81b3 100644 --- a/testbed/__main__.py +++ b/testbed/__main__.py @@ -1,17 +1,25 @@ from importlib import import_module import logging import os +import sys import six import unittest +from tornado import version_info as tornado_version enabled_platforms = [ 'threads', - 'tornado', 'gevent', ] +if tornado_version < (6, 0, 0, 0): + # Including testbed for Tornado coroutines and stack context. + # We don't need run testbed in case Tornado>=6, because it became + # asyncio-based framework and `stack_context` was deprecated. + enabled_platforms.append('tornado') if six.PY3: enabled_platforms.append('asyncio') +if sys.version_info >= (3, 7): + enabled_platforms.append('contextvars') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__package__) @@ -47,4 +55,6 @@ def get_test_directories(): suite = loader.loadTestsFromModule(test_module) main_suite.addTests(suite) -unittest.TextTestRunner(verbosity=3).run(main_suite) +result = unittest.TextTestRunner(verbosity=3).run(main_suite) +if result.failures or result.errors: + sys.exit(1) diff --git a/testbed/test_active_span_replacement/test_contextvars.py b/testbed/test_active_span_replacement/test_contextvars.py new file mode 100644 index 0000000..2741f38 --- /dev/null +++ b/testbed/test_active_span_replacement/test_contextvars.py @@ -0,0 +1,54 @@ +from __future__ import print_function + +import asyncio + +from opentracing.mocktracer import MockTracer +from ..testcase import OpenTracingTestCase +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..utils import stop_loop_when + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + # Start an isolated task and query for its result -and finish it- + # in another task/thread + span = self.tracer.start_span('initial') + self.submit_another_task(span) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) >= 3) + self.loop.run_forever() + + initial, subtask, task = self.tracer.finished_spans() + + self.assertEmptySpan(initial, 'initial') + self.assertEmptySpan(subtask, 'subtask') + self.assertEmptySpan(task, 'task') + + # task/subtask are part of the same trace, + # and subtask is a child of task + self.assertSameTrace(subtask, task) + self.assertIsChildOf(subtask, task) + + # initial task is not related in any way to those two tasks + self.assertNotSameTrace(initial, subtask) + self.assertHasNoParent(initial) + + async def task(self, span): + # Create a new Span for this task + with self.tracer.start_active_span('task'): + + with self.tracer.scope_manager.activate(span, True): + # Simulate work strictly related to the initial Span + pass + + # Use the task span as parent of a new subtask + with self.tracer.start_active_span('subtask'): + pass + + def submit_another_task(self, span): + self.loop.create_task(self.task(span)) diff --git a/testbed/test_client_server/test_contextvars.py b/testbed/test_client_server/test_contextvars.py new file mode 100644 index 0000000..e69e81f --- /dev/null +++ b/testbed/test_client_server/test_contextvars.py @@ -0,0 +1,79 @@ +from __future__ import print_function + + +import asyncio + +import opentracing +from opentracing.ext import tags +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..testcase import OpenTracingTestCase +from ..utils import get_logger, get_one_by_tag, stop_loop_when + + +logger = get_logger(__name__) + + +class Server(object): + def __init__(self, *args, **kwargs): + tracer = kwargs.pop('tracer') + queue = kwargs.pop('queue') + super(Server, self).__init__(*args, **kwargs) + + self.tracer = tracer + self.queue = queue + + async def run(self): + value = await self.queue.get() + self.process(value) + + def process(self, message): + logger.info('Processing message in server') + + ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message) + with self.tracer.start_active_span('receive', + child_of=ctx) as scope: + scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER) + + +class Client(object): + def __init__(self, tracer, queue): + self.tracer = tracer + self.queue = queue + + async def send(self): + with self.tracer.start_active_span('send') as scope: + scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT) + + message = {} + self.tracer.inject(scope.span.context, + opentracing.Format.TEXT_MAP, + message) + await self.queue.put(message) + + logger.info('Sent message from client') + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.queue = asyncio.Queue() + self.loop = asyncio.get_event_loop() + self.server = Server(tracer=self.tracer, queue=self.queue) + + def test(self): + client = Client(self.tracer, self.queue) + self.loop.create_task(self.server.run()) + self.loop.create_task(client.send()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) >= 2) + self.loop.run_forever() + + spans = self.tracer.finished_spans() + self.assertIsNotNone(get_one_by_tag(spans, + tags.SPAN_KIND, + tags.SPAN_KIND_RPC_SERVER)) + self.assertIsNotNone(get_one_by_tag(spans, + tags.SPAN_KIND, + tags.SPAN_KIND_RPC_CLIENT)) diff --git a/testbed/test_common_request_handler/README.md b/testbed/test_common_request_handler/README.md index 7fc1c7d..891f35a 100644 --- a/testbed/test_common_request_handler/README.md +++ b/testbed/test_common_request_handler/README.md @@ -5,7 +5,7 @@ This example shows a `Span` used with `RequestHandler`, which is used as a middl Implementation details: - For `threading`, no active `Span` is consumed as the tasks may be run concurrently on different threads, and an explicit `SpanContext` has to be saved to be used as parent. - For `gevent` and `asyncio`, as no automatic `Span` propagation is done, an explicit `Span` has to be saved to be used as parent (observe an instrumentation library could help to do that implicitly - we stick to the simplest case, though). -- For `tornado`, as the `StackContext` automatically propapates the context (even is the tasks are called through different coroutines), we **do** leverage the active `Span`. +- For `tornado` and `contextvars`, as parent `Span` propagates automatically (even if the tasks are called through different coroutines), we **do** leverage the active `Span`. RequestHandler implementation: @@ -20,6 +20,6 @@ RequestHandler implementation: child_of=self.context, ignore_active_span=True) else: - # Used by tornado. + # Used by tornado and contextvars. span = self.tracer.start_span('send') ``` diff --git a/testbed/test_common_request_handler/test_asyncio.py b/testbed/test_common_request_handler/test_asyncio.py index 126c7d9..0042a55 100644 --- a/testbed/test_common_request_handler/test_asyncio.py +++ b/testbed/test_common_request_handler/test_asyncio.py @@ -1,7 +1,5 @@ from __future__ import print_function -import functools - import asyncio from opentracing.ext import tags diff --git a/testbed/test_common_request_handler/test_contextvars.py b/testbed/test_common_request_handler/test_contextvars.py new file mode 100644 index 0000000..9198ca1 --- /dev/null +++ b/testbed/test_common_request_handler/test_contextvars.py @@ -0,0 +1,123 @@ +from __future__ import print_function + +import asyncio + +from opentracing.ext import tags +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..testcase import OpenTracingTestCase +from ..utils import get_logger, get_one_by_operation_name, stop_loop_when +from .request_handler import RequestHandler + + +logger = get_logger(__name__) + + +class Client(object): + def __init__(self, request_handler, loop): + self.request_handler = request_handler + self.loop = loop + + async def send_task(self, message): + request_context = {} + + async def before_handler(): + self.request_handler.before_request(message, request_context) + + async def after_handler(): + self.request_handler.after_request(message, request_context) + + await before_handler() + await after_handler() + + return '%s::response' % message + + def send(self, message): + return self.send_task(message) + + def send_sync(self, message): + return self.loop.run_until_complete(self.send_task(message)) + + +class TestAsyncioContextVars(OpenTracingTestCase): + """ + There is only one instance of 'RequestHandler' per 'Client'. Methods of + 'RequestHandler' are executed in different Tasks, but the context + is the same, so we can leverage it for accessing the active span. + """ + + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + self.client = Client(RequestHandler(self.tracer), self.loop) + + def test_two_callbacks(self): + res_future1 = self.loop.create_task(self.client.send('message1')) + res_future2 = self.loop.create_task(self.client.send('message2')) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) >= 2) + self.loop.run_forever() + + self.assertEqual('message1::response', res_future1.result()) + self.assertEqual('message2::response', res_future2.result()) + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 2) + + for span in spans: + self.assertEqual(span.tags.get(tags.SPAN_KIND, None), + tags.SPAN_KIND_RPC_CLIENT) + + self.assertNotSameTrace(spans[0], spans[1]) + self.assertIsNone(spans[0].parent_id) + self.assertIsNone(spans[1].parent_id) + + def test_parent_not_picked(self): + """Active parent should not be picked up by child + as we pass ignore_active_span=True to the RequestHandler""" + + async def do(): + with self.tracer.start_active_span('parent'): + response = await self.client.send_task('no_parent') + self.assertEqual('no_parent::response', response) + + self.loop.run_until_complete(do()) + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 2) + + child_span = get_one_by_operation_name(spans, 'send') + self.assertIsNotNone(child_span) + + parent_span = get_one_by_operation_name(spans, 'parent') + self.assertIsNotNone(parent_span) + + # Here check that there is no parent-child relation. + self.assertIsNotChildOf(child_span, parent_span) + + def test_good_solution_to_set_parent(self): + """Solution is good because, though the RequestHandler being shared, + the context will be properly detected.""" + + with self.tracer.start_active_span('parent'): + req_handler = RequestHandler(self.tracer, + ignore_active_span=False) + client = Client(req_handler, self.loop) + response = client.send_sync('correct_parent') + + self.assertEqual('correct_parent::response', response) + + # Should NOT be a child of the previously activated Span + response = client.send_sync('wrong_parent') + self.assertEqual('wrong_parent::response', response) + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 3) + + spans = sorted(spans, key=lambda x: x.start_time) + parent_span = get_one_by_operation_name(spans, 'parent') + self.assertIsNotNone(parent_span) + + self.assertIsChildOf(spans[1], parent_span) + self.assertIsNotChildOf(spans[2], parent_span) # Proper parent (none). diff --git a/testbed/test_late_span_finish/test_asyncio.py b/testbed/test_late_span_finish/test_asyncio.py index 782ade2..7c81115 100644 --- a/testbed/test_late_span_finish/test_asyncio.py +++ b/testbed/test_late_span_finish/test_asyncio.py @@ -44,7 +44,7 @@ async def task(name): logger.info('Running %s' % name) with self.tracer.scope_manager.activate(parent_span, False): with self.tracer.start_active_span(name): - asyncio.sleep(0.1) + await asyncio.sleep(0.1) self.loop.create_task(task('task1')) self.loop.create_task(task('task2')) diff --git a/testbed/test_late_span_finish/test_contextvars.py b/testbed/test_late_span_finish/test_contextvars.py new file mode 100644 index 0000000..d830c9e --- /dev/null +++ b/testbed/test_late_span_finish/test_contextvars.py @@ -0,0 +1,47 @@ +from __future__ import print_function + +import asyncio + +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..testcase import OpenTracingTestCase +from ..utils import get_logger, stop_loop_when + + +logger = get_logger(__name__) + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + + parent_scope = self.tracer.start_active_span('parent') + self.submit_subtasks() + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) >= 2) + self.loop.run_forever() + + # Late-finish the parent Span now. + parent_scope.close() + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 3) + self.assertNamesEqual(spans, ['task1', 'task2', 'parent']) + + for i in range(2): + self.assertSameTrace(spans[i], spans[-1]) + self.assertIsChildOf(spans[i], spans[-1]) + self.assertTrue(spans[i].finish_time <= spans[-1].finish_time) + + def submit_subtasks(self): + async def task(name): + logger.info('Running %s' % name) + with self.tracer.start_active_span(name): + await asyncio.sleep(0.1) + + self.loop.create_task(task('task1')) + self.loop.create_task(task('task2')) diff --git a/testbed/test_listener_per_request/test_contextvars.py b/testbed/test_listener_per_request/test_contextvars.py new file mode 100644 index 0000000..1fe4247 --- /dev/null +++ b/testbed/test_listener_per_request/test_contextvars.py @@ -0,0 +1,46 @@ +from __future__ import print_function + +import asyncio + +from opentracing.ext import tags +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..testcase import OpenTracingTestCase +from ..utils import get_one_by_tag + +from .response_listener import ResponseListener + + +class Client(object): + def __init__(self, tracer, loop): + self.tracer = tracer + self.loop = loop + + async def task(self, message, listener): + res = '%s::response' % message + listener.on_response(res) + return res + + def send_sync(self, message): + span = self.tracer.start_span('send') + span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT) + + listener = ResponseListener(span) + return self.loop.run_until_complete(self.task(message, listener)) + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + client = Client(self.tracer, self.loop) + res = client.send_sync('message') + self.assertEqual(res, 'message::response') + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 1) + + span = get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT) + self.assertIsNotNone(span) diff --git a/testbed/test_multiple_callbacks/README.md b/testbed/test_multiple_callbacks/README.md index b6870fc..a2fb5b9 100644 --- a/testbed/test_multiple_callbacks/README.md +++ b/testbed/test_multiple_callbacks/README.md @@ -6,8 +6,9 @@ This example shows a `Span` created for a top-level operation, covering a set of Implementation details: - For `threading`, a thread-safe counter is put in each `Span` to keep track of the pending callbacks, and call `Span.finish()` when the count becomes 0. -- For `gevent`, `tornado` and `asyncio` the children corotuines representing the subtasks are simply yielded over, so no counter is needed. +- For `gevent`, `tornado`, `asyncio` and `contextvars` the children coroutines representing the subtasks are simply yielded over, so no counter is needed. - For `tornado`, the invoked coroutines do not set any active `Span` as doing so messes the used `StackContext`. So yielding over **multiple** coroutines is not supported. +- For `contextvars`, parent context is propagated to the children coroutines implicitly, manual context activation has been avoided. `threading` implementation: ```python diff --git a/testbed/test_multiple_callbacks/test_asyncio.py b/testbed/test_multiple_callbacks/test_asyncio.py index 1da3146..2e29dd2 100644 --- a/testbed/test_multiple_callbacks/test_asyncio.py +++ b/testbed/test_multiple_callbacks/test_asyncio.py @@ -7,7 +7,7 @@ from opentracing.mocktracer import MockTracer from opentracing.scope_managers.asyncio import AsyncioScopeManager from ..testcase import OpenTracingTestCase -from ..utils import RefCount, get_logger, stop_loop_when +from ..utils import get_logger, stop_loop_when random.seed() diff --git a/testbed/test_multiple_callbacks/test_contextvars.py b/testbed/test_multiple_callbacks/test_contextvars.py new file mode 100644 index 0000000..01ca92f --- /dev/null +++ b/testbed/test_multiple_callbacks/test_contextvars.py @@ -0,0 +1,56 @@ +from __future__ import print_function + +import random + +import asyncio + +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from ..testcase import OpenTracingTestCase +from ..utils import get_logger, stop_loop_when + + +random.seed() +logger = get_logger(__name__) + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + # Need to run within a Task, as the scope manager depends + # on Task.current_task() + async def main_task(): + with self.tracer.start_active_span('parent'): + tasks = self.submit_callbacks() + await asyncio.gather(*tasks) + + self.loop.create_task(main_task()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) >= 4) + self.loop.run_forever() + + spans = self.tracer.finished_spans() + self.assertEqual(len(spans), 4) + self.assertNamesEqual(spans, ['task', 'task', 'task', 'parent']) + + for i in range(3): + self.assertSameTrace(spans[i], spans[-1]) + self.assertIsChildOf(spans[i], spans[-1]) + + async def task(self, interval): + logger.info('Starting task') + with self.tracer.start_active_span('task'): + await asyncio.sleep(interval) + + def submit_callbacks(self): + tasks = [] + for i in range(3): + interval = 0.1 + random.randint(200, 500) * 0.001 + t = self.loop.create_task(self.task(interval)) + tasks.append(t) + + return tasks diff --git a/testbed/test_nested_callbacks/README.md b/testbed/test_nested_callbacks/README.md index b6da45e..c75beca 100644 --- a/testbed/test_nested_callbacks/README.md +++ b/testbed/test_nested_callbacks/README.md @@ -3,8 +3,8 @@ This example shows a `Span` for a top-level operation, and how it can be passed down on a list of nested callbacks (always one at a time), have it as the active one for each of them, and finished **only** when the last one executes. For Python, we have decided to do it in a **fire-and-forget** fashion. Implementation details: -- For `threading`, `gevent` and `tornado` the `Span` is manually passed down the call chain, activating it in each corotuine/task. -- For `tornado`, the active `Span` is not passed nor activated down the chain as the custom `StackContext` automatically propagates it. +- For `threading`, `gevent` and `asyncio` the `Span` is manually passed down the call chain, activating it in each corotuine/task. +- For `tornado` and `contextvars`, the active `Span` is not passed down nor activated because the context is implicitly propagated. `threading` implementation: ```python diff --git a/testbed/test_nested_callbacks/test_contextvars.py b/testbed/test_nested_callbacks/test_contextvars.py new file mode 100644 index 0000000..deca5ac --- /dev/null +++ b/testbed/test_nested_callbacks/test_contextvars.py @@ -0,0 +1,344 @@ +from __future__ import print_function + + +import asyncio + +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.contextvars import ContextVarsScopeManager, \ + no_parent_scope +from ..testcase import OpenTracingTestCase +from ..utils import stop_loop_when + + +class TestAsyncioContextVars(OpenTracingTestCase): + + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + + def submit(): + span = self.tracer.scope_manager.active.span + + async def task1(): + self.assertEqual(span, self.tracer.active_span) + self.tracer.active_span.set_tag('key1', '1') + + async def task2(): + self.assertEqual(span, self.tracer.active_span) + self.tracer.active_span.set_tag('key2', '2') + + async def task3(): + self.assertEqual(span, self.tracer.active_span) + self.tracer.active_span.set_tag('key3', '3') + self.tracer.active_span.finish() + + self.loop.create_task(task3()) + + self.loop.create_task(task2()) + + self.loop.create_task(task1()) + + # Start a Span and let the callback-chain + # finish it when the task is done + async def task(): + with self.tracer.start_active_span('one', finish_on_close=False): + submit() + + self.loop.create_task(task()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 1) + self.loop.run_forever() + + span, = self.tracer.finished_spans() + self.assertEqual(span.operation_name, 'one') + + for i in range(1, 4): + self.assertEqual(span.tags.get('key%s' % i, None), str(i)) + + +class TestAsyncioContextVarsScheduleInLoop(OpenTracingTestCase): + + # TODO: move the test-case to another file + + def setUp(self): + self.tracer = MockTracer(ContextVarsScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_schedule_callbacks(self): + + def callback(op_name): + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span, + ): + pass + + def callback_with_nested_callback(op_name): + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span, + ): + self.loop.call_soon(callback, 'childof:{}'.format(op_name)) + + with self.tracer.start_active_span('root'): + self.loop.call_soon(callback_with_nested_callback, 'first') + self.loop.call_soon(callback, 'second') + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 4) + self.loop.run_forever() + + root, first, second, childof_first = self.tracer.finished_spans() + self.assertEmptySpan(root, 'root') + self.assertEmptySpan(first, 'first') + self.assertEmptySpan(second, 'second') + self.assertEmptySpan(childof_first, 'childof:first') + + self.assertIsChildOf(first, root) + self.assertIsChildOf(childof_first, first) + self.assertIsChildOf(second, root) + + def test_coroutines_schedule_callbacks(self): + + def callback(op_name): + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span + ): + pass + + async def task(op_name): + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span + ): + self.loop.call_later( + 0.1, callback, 'childof:{}'.format(op_name) + ) + with self.tracer.start_active_span('root'): + self.loop.create_task(task('task1')) + self.loop.create_task(task('task2')) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 5) + self.loop.run_forever() + + root, task1, task2, child1, child2 = self.tracer.finished_spans() + + self.assertEmptySpan(root, 'root') + self.assertEmptySpan(task1, 'task1') + self.assertEmptySpan(task2, 'task2') + self.assertEmptySpan(child1, 'childof:task1') + self.assertEmptySpan(child2, 'childof:task2') + + self.assertIsChildOf(task1, root) + self.assertIsChildOf(task2, root) + self.assertIsChildOf(child1, task1) + self.assertIsChildOf(child2, task2) + + def test_coroutines_scheduling_task(self): + + async def _task(op_name): + await asyncio.sleep(0.1) + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span + ): + pass + + async def task(op_name): + with self.tracer.start_active_span( + operation_name=op_name, + child_of=self.tracer.active_span + ): + self.loop.create_task(_task('childof:{}'.format(op_name))) + + with self.tracer.start_active_span('root'): + self.loop.create_task(task('task1')) + self.loop.create_task(task('task2')) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 5) + self.loop.run_forever() + + root, task1, task2, child1, child2 = self.tracer.finished_spans() + + self.assertEmptySpan(root, 'root') + self.assertEmptySpan(task1, 'task1') + self.assertEmptySpan(task2, 'task2') + self.assertEmptySpan(child1, 'childof:task1') + self.assertEmptySpan(child2, 'childof:task2') + + self.assertIsChildOf(task1, root) + self.assertIsChildOf(task2, root) + self.assertIsChildOf(child1, task1) + self.assertIsChildOf(child2, task2) + + def test_recursive_scheduling_task(self): + + tasks = 4 + + async def task(n=0): + await asyncio.sleep(0.1) + with self.tracer.start_active_span( + operation_name=str(n), + child_of=self.tracer.active_span + ): + if n < tasks: + self.loop.create_task(task(n+1)) + + self.loop.create_task(task()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == tasks) + self.loop.run_forever() + + spans = self.tracer.finished_spans() + + for i in range(tasks): + self.assertEmptySpan(spans[i], str(i)) + if i == 0: + self.assertIsNone(spans[i].parent_id) + else: + self.assertIsChildOf(spans[i], spans[i-1]) + + def test_recursive_scheduling_with_ignoring_active_span(self): + + tasks = 4 + + async def task(n=0): + await asyncio.sleep(0.1) + if n < tasks / 2: + with self.tracer.start_active_span(str(n)): + self.loop.create_task(task(n+1)) + elif n < tasks: + with self.tracer.start_active_span( + operation_name=str(n), + ignore_active_span=True + ): + self.loop.create_task(task(n+1)) + + self.loop.create_task(task()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == tasks) + self.loop.run_forever() + + s0, s1, s2, s3 = self.tracer.finished_spans() + + self.assertEmptySpan(s0, '0') + self.assertHasNoParent(s0) + + self.assertEmptySpan(s1, '1') + self.assertIsChildOf(s1, s0) + + self.assertEmptySpan(s2, '2') + self.assertHasNoParent(s2) + + self.assertEmptySpan(s3, '3') + self.assertHasNoParent(s3) + + def test_tasks_with_no_parent_scope(self): + + async def task(name): + await asyncio.sleep(0.1) + with self.tracer.start_active_span(name): + await asyncio.sleep(0.1) + + async def tasks(): + self.loop.create_task(task('task_1')) + with no_parent_scope(): + self.loop.create_task(task('task_2')) + self.loop.create_task(task('task_3')) + + with self.tracer.start_active_span('root'): + self.loop.create_task(tasks()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 4) + self.loop.run_forever() + + root, task1, task2, task3 = self.tracer.finished_spans() + + self.assertEmptySpan(root, 'root') + + self.assertEmptySpan(task1, 'task_1') + self.assertIsChildOf(task1, root) + + # Third task was scheduled out `no_parent_scope`. + self.assertEmptySpan(task3, 'task_3') + self.assertIsChildOf(task3, root) + + # Second task "wrapped" by `no_parent_scope`. + self.assertEmptySpan(task2, 'task_2') + self.assertHasNoParent(task2) + + def test_callbacks_with_no_parent_scope(self): + + def callback(name): + with self.tracer.start_active_span(name): + pass + + def callbacks(): + self.loop.call_soon(callback, 'task_1') + with no_parent_scope(): + self.loop.call_soon(callback, 'task_2') + self.loop.call_soon(callback, 'task_3') + + with self.tracer.start_active_span('root'): + self.loop.call_soon(callbacks) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 4) + self.loop.run_forever() + + root, task1, task2, task3 = self.tracer.finished_spans() + + self.assertEmptySpan(root, 'root') + + self.assertEmptySpan(task1, 'task_1') + self.assertIsChildOf(task1, root) + + # Third task was scheduled out `no_parent_scope`. + self.assertEmptySpan(task3, 'task_3') + self.assertIsChildOf(task3, root) + + # Second task "wrapped" by `no_parent_scope`. + self.assertEmptySpan(task2, 'task_2') + self.assertHasNoParent(task2) + + def test_await_with_no_parent_scope(self): + + async def coro(name): + with self.tracer.start_active_span(name): + pass + + async def main_coro(): + await coro('coro_1') + with no_parent_scope(): + await coro('coro_2') + await coro('coro_3') + + with self.tracer.start_active_span('root'): + self.loop.create_task(main_coro()) + + stop_loop_when(self.loop, + lambda: len(self.tracer.finished_spans()) == 4) + self.loop.run_forever() + + root, coro1, coro2, coro3 = self.tracer.finished_spans() + + self.assertEmptySpan(root, 'root') + + self.assertEmptySpan(coro1, 'coro_1') + self.assertIsChildOf(coro1, root) + + # second coroutine "wrapped" by `no_parent_scope`. + self.assertEmptySpan(coro2, 'coro_2') + self.assertHasNoParent(coro2) + + self.assertEmptySpan(coro3, 'coro_3') + self.assertIsChildOf(coro3, root) diff --git a/testbed/test_subtask_span_propagation/README.md b/testbed/test_subtask_span_propagation/README.md index 0f5dab5..0fa2424 100644 --- a/testbed/test_subtask_span_propagation/README.md +++ b/testbed/test_subtask_span_propagation/README.md @@ -4,7 +4,7 @@ This example shows an active `Span` being simply propagated to the subtasks -eit Implementation details: - For `threading`, `gevent` and `asyncio` the `Span` is manually passed down the call chain, being manually reactivated it in each corotuine/task. -- For `tornado`, the active `Span` is not passed nor activated down the chain as the custom `StackContext` automatically propagates it. +- For `tornado` and `contextvars`, the active `Span` is not passed down the chain nor activated because the context is implicitly propagated. `threading` implementation: ```python diff --git a/testbed/test_subtask_span_propagation/test_asyncio.py b/testbed/test_subtask_span_propagation/test_asyncio.py index 73b4ccd..4f0e705 100644 --- a/testbed/test_subtask_span_propagation/test_asyncio.py +++ b/testbed/test_subtask_span_propagation/test_asyncio.py @@ -1,7 +1,5 @@ from __future__ import absolute_import, print_function -import functools - import asyncio from opentracing.mocktracer import MockTracer diff --git a/testbed/test_subtask_span_propagation/test_contextvars.py b/testbed/test_subtask_span_propagation/test_contextvars.py new file mode 100644 index 0000000..b71333c --- /dev/null +++ b/testbed/test_subtask_span_propagation/test_contextvars.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import, print_function + +import asyncio + +from opentracing.mocktracer import MockTracer +from opentracing.scope_managers.asyncio import AsyncioScopeManager +from ..testcase import OpenTracingTestCase + + +class TestAsyncioContextVars(OpenTracingTestCase): + def setUp(self): + self.tracer = MockTracer(AsyncioScopeManager()) + self.loop = asyncio.get_event_loop() + + def test_main(self): + res = self.loop.run_until_complete(self.parent_task('message')) + self.assertEqual(res, 'message::response') + + child, parent = self.tracer.finished_spans() + self.assertEmptySpan(child, 'child') + self.assertEmptySpan(parent, 'parent') + self.assertIsChildOf(child, parent) + + async def parent_task(self, message): + with self.tracer.start_active_span('parent'): + res = await self.child_task(message) + + return res + + async def child_task(self, message): + with self.tracer.start_active_span('child'): + return '%s::response' % message diff --git a/testbed/testcase.py b/testbed/testcase.py index 2785c74..6ea7e8e 100644 --- a/testbed/testcase.py +++ b/testbed/testcase.py @@ -16,5 +16,13 @@ def assertIsChildOf(self, spanA, spanB): def assertIsNotChildOf(self, spanA, spanB): return self.assertNotEqual(spanA.parent_id, spanB.context.span_id) + def assertHasNoParent(self, span): + return self.assertIsNone(span.parent_id) + def assertNamesEqual(self, spans, names): self.assertEqual(list(map(lambda x: x.operation_name, spans)), names) + + def assertEmptySpan(self, span, name): + self.assertEqual(span.operation_name, name) + self.assertEqual(span.tags, {}) + self.assertEqual(len(span.logs), 0) diff --git a/tests/conftest.py b/tests/conftest.py index 6063d5a..9d33c19 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,12 +18,21 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from __future__ import absolute_import - +import sys import six PYTHON3_FILES = [ 'scope_managers/test_asyncio.py', ] +PYTHON37_FILES = [ + 'scope_managers/test_contextvars.py', +] + +collect_ignore = [] + if six.PY2: - collect_ignore = PYTHON3_FILES + collect_ignore += PYTHON3_FILES + +if sys.version_info < (3, 7): + collect_ignore += PYTHON37_FILES diff --git a/tests/scope_managers/test_contextvars.py b/tests/scope_managers/test_contextvars.py new file mode 100644 index 0000000..dd1820f --- /dev/null +++ b/tests/scope_managers/test_contextvars.py @@ -0,0 +1,52 @@ +# Copyright (c) The OpenTracing Authors. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from __future__ import absolute_import + +from concurrent.futures import ThreadPoolExecutor +from unittest import TestCase + +import asyncio + +from opentracing.scope_managers.contextvars import ContextVarsScopeManager +from opentracing.harness.scope_check import ScopeCompatibilityCheckMixin + + +class AsyncioContextVarsCompabilityCheck( + TestCase, ScopeCompatibilityCheckMixin +): + + def scope_manager(self): + return ContextVarsScopeManager() + + def run_test(self, test_fn): + @asyncio.coroutine + def async_test_fn(): + test_fn() + asyncio.get_event_loop().run_until_complete(async_test_fn()) + + def test_no_event_loop(self): + # no event loop exists by default in + # new threads, so make sure we don't fail there. + def test_fn(): + manager = self.scope_manager() + assert manager.active is None + + executor = ThreadPoolExecutor(max_workers=1) + executor.submit(test_fn).result() diff --git a/tests/scope_managers/test_tornado.py b/tests/scope_managers/test_tornado.py index 99177f3..6e85390 100644 --- a/tests/scope_managers/test_tornado.py +++ b/tests/scope_managers/test_tornado.py @@ -19,16 +19,22 @@ # THE SOFTWARE. from __future__ import absolute_import - +import pytest from unittest import TestCase -from tornado import ioloop - -from opentracing.scope_managers.tornado import TornadoScopeManager -from opentracing.scope_managers.tornado import tracer_stack_context +from tornado import ioloop, version_info +try: + from opentracing.scope_managers.tornado import TornadoScopeManager + from opentracing.scope_managers.tornado import tracer_stack_context +except ImportError: + pass from opentracing.harness.scope_check import ScopeCompatibilityCheckMixin +# We don't need run tests in case Tornado>=6, because it became +# asyncio-based framework and `stack_context` was deprecated. +@pytest.mark.skipif(version_info >= (6, 0, 0, 0), + reason='skip Tornado >= 6') class TornadoCompabilityCheck(TestCase, ScopeCompatibilityCheckMixin): def scope_manager(self): return TornadoScopeManager()