Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
Make new context manager based on python 3.7 contextvars.
Browse files Browse the repository at this point in the history
  • Loading branch information
condorcet committed Jul 26, 2019
1 parent d6f7a6b commit 376c91d
Show file tree
Hide file tree
Showing 27 changed files with 1,006 additions and 99 deletions.
5 changes: 0 additions & 5 deletions opentracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
# limitations under the License.

from __future__ import absolute_import
try:
# Contextvars backport with coroutine supporting (python 3.6).
import aiocontextvars # noqa
except ImportError:
pass
from .span import Span # noqa
from .span import SpanContext # noqa
from .scope import Scope # noqa
Expand Down
71 changes: 53 additions & 18 deletions opentracing/scope_managers/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,32 @@

from __future__ import absolute_import

from contextvars import ContextVar
import asyncio

from opentracing import Scope
from opentracing.scope_managers import ThreadLocalScopeManager


_SCOPE = ContextVar('scope')
from .constants import ACTIVE_ATTR


class AsyncioScopeManager(ThreadLocalScopeManager):
"""
:class:`~opentracing.ScopeManager` implementation for **asyncio**
that stores the :class:`~opentracing.Scope` using ContextVar.
that stores the :class:`~opentracing.Scope` in the current
:class:`Task` (:meth:`Task.current_task()`), falling back to
thread-local storage if none was being executed.
The scope manager provides automatic :class:`~opentracing.Span` propagation
from parent coroutines to their children.
Automatic :class:`~opentracing.Span` propagation from
parent coroutines to their children is not provided, which needs to be
done manually:
.. code-block:: python
async def child_coroutine(span):
# No need manual activation of parent span in child coroutine.
with tracer.start_active_span('child') as scope:
...
# activate the parent Span, but do not finish it upon
# deactivation. That will be done by the parent coroutine.
with tracer.scope_manager.activate(span, finish_on_close=False):
with tracer.start_active_span('child') as scope:
...
async def parent_coroutine():
with tracer.start_active_span('parent') as scope:
Expand All @@ -60,13 +63,24 @@ def activate(self, span, finish_on_close):
:param finish_on_close: whether *span* should automatically be
finished when :meth:`Scope.close()` is called.
If no :class:`Task` is being executed, thread-local
storage will be used to store the :class:`~opentracing.Scope`.
:return: a :class:`~opentracing.Scope` instance to control the end
of the active period for the :class:`~opentracing.Span`.
It is a programming error to neglect to call :meth:`Scope.close()`
on the returned instance.
"""

return self._set_scope(span, finish_on_close)
task = self._get_task()
if not task:
return super(AsyncioScopeManager, self).activate(span,
finish_on_close)

scope = _AsyncioScope(self, span, finish_on_close)
self._set_task_scope(scope, task)

return scope

@property
def active(self):
Expand All @@ -79,25 +93,46 @@ def active(self):
or ``None`` if not available.
"""

return self._get_scope()
task = self._get_task()
if not task:
return super(AsyncioScopeManager, self).active

def _set_scope(self, span, finish_on_close):
return _AsyncioScope(self, span, finish_on_close)
return self._get_task_scope(task)

def _get_scope(self):
return _SCOPE.get(None)
def _get_task(self):
try:
# Prevent failure when run from a thread
# without an event loop.
loop = asyncio.get_event_loop()
except RuntimeError:
return None

return asyncio.Task.current_task(loop=loop)

def _set_task_scope(self, scope, task=None):
if task is None:
task = self._get_task()

setattr(task, ACTIVE_ATTR, scope)

def _get_task_scope(self, task=None):
if task is None:
task = self._get_task()

return getattr(task, ACTIVE_ATTR, None)


class _AsyncioScope(Scope):
def __init__(self, manager, span, finish_on_close):
super(_AsyncioScope, self).__init__(manager, span)
self._finish_on_close = finish_on_close
self._token = _SCOPE.set(self)
self._to_restore = manager.active

def close(self):
if self.manager.active is not self:
return
_SCOPE.reset(self._token)

self.manager._set_task_scope(self._to_restore)

if self._finish_on_close:
self.span.finish()
115 changes: 115 additions & 0 deletions opentracing/scope_managers/contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright (c) The OpenTracing Authors.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

from __future__ import absolute_import

from contextlib import contextmanager
from contextvars import ContextVar

from opentracing import Scope
from opentracing.scope_managers import ThreadLocalScopeManager


_SCOPE = ContextVar('scope')


class ContextVarsScopeManager(ThreadLocalScopeManager):
"""
:class:`~opentracing.ScopeManager` implementation for **asyncio**
that stores the :class:`~opentracing.Scope` using ContextVar.
The scope manager provides automatic :class:`~opentracing.Span` propagation
from parent coroutines to their children.
.. code-block:: python
async def child_coroutine(span):
# No need manual activation of parent span in child coroutine.
with tracer.start_active_span('child') as scope:
...
async def parent_coroutine():
with tracer.start_active_span('parent') as scope:
...
await child_coroutine(span)
...
"""
# TODO: update description

def activate(self, span, finish_on_close):
"""
Make a :class:`~opentracing.Span` instance active.
:param span: the :class:`~opentracing.Span` that should become active.
:param finish_on_close: whether *span* should automatically be
finished when :meth:`Scope.close()` is called.
:return: a :class:`~opentracing.Scope` instance to control the end
of the active period for the :class:`~opentracing.Span`.
It is a programming error to neglect to call :meth:`Scope.close()`
on the returned instance.
"""

return self._set_scope(span, finish_on_close)

@property
def active(self):
"""
Return the currently active :class:`~opentracing.Scope` which
can be used to access the currently active
:attr:`Scope.span`.
:return: the :class:`~opentracing.Scope` that is active,
or ``None`` if not available.
"""
# TODO: update description

return self._get_scope()

def _set_scope(self, span, finish_on_close):
return _ContextVarsScope(self, span, finish_on_close)

def _get_scope(self):
return _SCOPE.get(None)


class _ContextVarsScope(Scope):
def __init__(self, manager, span, finish_on_close):
super(_ContextVarsScope, self).__init__(manager, span)
self._finish_on_close = finish_on_close
self._token = _SCOPE.set(self)

def close(self):
if self.manager.active is not self:
return
_SCOPE.reset(self._token)

if self._finish_on_close:
self.span.finish()


@contextmanager
def no_parent_scope():
token = _SCOPE.set(None)
try:
yield
finally:
_SCOPE.reset(token)
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
platforms='any',
install_requires=[
'futures;python_version=="2.7"',
'aiocontextvars;python_version>="3.5"',
],
extras_require={
'tests': [
Expand Down
17 changes: 6 additions & 11 deletions testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@ Alternatively, due to the organization of the suite, it's possible to run direct

## Tested frameworks

Currently the examples cover from ..utils import get_one_by_operation_name, stop_loop_when
`threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.
Currently the examples cover `threading`, `tornado`, `gevent`, `asyncio` (which requires Python 3) and `contextvars` (which requires Python 3.7 and higher). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.

### threading and gevent
### threading, asyncio and gevent

No automatic `Span` propagation between parent and children tasks is provided, and thus the `Span` need to be manually passed down the chain.

### asyncio

`AsyncioScopeManager` supports automatically propagate the context from parent coroutines to their children. For compatibility reasons with previous version of `AsyncioScopeManager`, asyncio testbed contains test cases showing that manual activation of parent span in child span also works as expected.

### tornado

`TornadoScopeManager` uses a variation of `tornado.stack_context.StackContext` to both store **and** automatically propagate the context from parent coroutines to their children.

### contextvars

`ContextVarsScopeManager` uses [contextvars](https://docs.python.org/3/library/contextvars.html) module to both store **and** automatically propagate the context from parent coroutines / tasks / scheduled in event loop callbacks to their children.

Currently, yielding over multiple children is not supported, as the context is effectively shared, and switching from coroutine to coroutine messes up the current active `Span`.

## List of patterns
Expand All @@ -59,7 +58,3 @@ testbed/
```

Supporting all the platforms is optional, and a warning will be displayed when doing `make testbed` in such case.

## Flake8 support

Currently `flake8` does not support the Python 3 `await`/`async` syntax, and does not offer a way to ignore such syntax.
1 change: 0 additions & 1 deletion testbed/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

2 changes: 2 additions & 0 deletions testbed/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
enabled_platforms.append('tornado')
if six.PY3:
enabled_platforms.append('asyncio')
if sys.version_info >= (3, 7):
enabled_platforms.append('contextvars')

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__package__)
Expand Down
54 changes: 54 additions & 0 deletions testbed/test_active_span_replacement/test_contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import print_function

import asyncio

from opentracing.mocktracer import MockTracer
from ..testcase import OpenTracingTestCase
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
from ..utils import stop_loop_when


class TestAsyncioContextVars(OpenTracingTestCase):
def setUp(self):
self.tracer = MockTracer(ContextVarsScopeManager())
self.loop = asyncio.get_event_loop()

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span('initial')
self.submit_another_task(span)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 3)
self.loop.run_forever()

initial, subtask, task = self.tracer.finished_spans()

self.assertEmptySpan(initial, 'initial')
self.assertEmptySpan(subtask, 'subtask')
self.assertEmptySpan(task, 'task')

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(subtask, task)
self.assertIsChildOf(subtask, task)

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(initial, subtask)
self.assertHasNoParent(initial)

async def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span('task'):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span('subtask'):
pass

def submit_another_task(self, span):
self.loop.create_task(self.task(span))
Loading

0 comments on commit 376c91d

Please sign in to comment.