Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
Add AsyncioScopeManager based on contextvars and supporting Tornado 6 (
Browse files Browse the repository at this point in the history
…#118)

* Asyncio context manager with contextvars. Add different versions of tornado to travis.yml.

* Make new context manager based on python 3.7 contextvars.

* Inherit ContextVarsScopeManagerFix directly from ScopeManager, fix docstrings and README

* Update testbed/test_multiple_callbacks/README.md

Co-Authored-By: Yuri Shkuro <yurishkuro@users.noreply.github.com>

* Update testbed/test_nested_callbacks/README.md

Co-Authored-By: Yuri Shkuro <yurishkuro@users.noreply.github.com>

* Update testbed/test_subtask_span_propagation/README.md

Co-Authored-By: Yuri Shkuro <yurishkuro@users.noreply.github.com>

* Fix typo in testbed docs

* Remove obsolete description from testbed docs

* Update testbed/test_common_request_handler/README.md

Co-Authored-By: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
condorcet and yurishkuro committed Dec 17, 2019
1 parent 7d2e62b commit 170f927
Show file tree
Hide file tree
Showing 26 changed files with 1,033 additions and 29 deletions.
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ python:
- "3.7"
- "3.8-dev"

env:
- TORNADO=">=4,<5"
- TORNADO=">=5,<6"
- TORNADO=">=6"

matrix:
allow_failures:
- python: "3.8-dev"
exclude:
- python: "2.7"
env: TORNADO=">=6"

install:
- make bootstrap
- pip install -q "tornado$TORNADO"

script:
- make test testbed lint
131 changes: 131 additions & 0 deletions opentracing/scope_managers/contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright (c) The OpenTracing Authors.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

from __future__ import absolute_import

from contextlib import contextmanager
from contextvars import ContextVar

from opentracing import Scope, ScopeManager


_SCOPE = ContextVar('scope')


class ContextVarsScopeManager(ScopeManager):
"""
:class:`~opentracing.ScopeManager` implementation for **asyncio**
that stores the :class:`~opentracing.Scope` using ContextVar.
The scope manager provides automatic :class:`~opentracing.Span` propagation
from parent coroutines, tasks and scheduled in event loop callbacks to
their children.
.. code-block:: python
async def child_coroutine():
# No need manual activation of parent span in child coroutine.
with tracer.start_active_span('child') as scope:
...
async def parent_coroutine():
with tracer.start_active_span('parent') as scope:
...
await child_coroutine()
...
"""

def activate(self, span, finish_on_close):
"""
Make a :class:`~opentracing.Span` instance active.
:param span: the :class:`~opentracing.Span` that should become active.
:param finish_on_close: whether *span* should automatically be
finished when :meth:`Scope.close()` is called.
:return: a :class:`~opentracing.Scope` instance to control the end
of the active period for the :class:`~opentracing.Span`.
It is a programming error to neglect to call :meth:`Scope.close()`
on the returned instance.
"""

return self._set_scope(span, finish_on_close)

@property
def active(self):
"""
Return the currently active :class:`~opentracing.Scope` which
can be used to access the currently active :attr:`Scope.span`.
:return: the :class:`~opentracing.Scope` that is active,
or ``None`` if not available.
"""

return self._get_scope()

def _set_scope(self, span, finish_on_close):
return _ContextVarsScope(self, span, finish_on_close)

def _get_scope(self):
return _SCOPE.get(None)


class _ContextVarsScope(Scope):
def __init__(self, manager, span, finish_on_close):
super(_ContextVarsScope, self).__init__(manager, span)
self._finish_on_close = finish_on_close
self._token = _SCOPE.set(self)

def close(self):
if self.manager.active is not self:
return

_SCOPE.reset(self._token)

if self._finish_on_close:
self.span.finish()


@contextmanager
def no_parent_scope():
"""
Context manager that resets current Scope. Intended to break span
propagation to children coroutines, tasks or scheduled callbacks.
.. code-block:: python
from opentracing.scope_managers.contextvars import no_parent_scope
def periodic()
# `periodic` span will be children of root only at the first time.
with self.tracer.start_active_span('periodic'):
# Now we break span propagation.
with no_parent_scope():
self.loop.call_soon(periodic)
with self.tracer.start_active_span('root'):
self.loop.call_soon(periodic)
"""
token = _SCOPE.set(None)
try:
yield
finally:
_SCOPE.reset(token)
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
include_package_data=True,
zip_safe=False,
platforms='any',
install_requires=[
'futures;python_version=="2.7"',
],
extras_require={
'tests': [
'doubles',
Expand All @@ -40,8 +43,7 @@

'six>=1.10.0,<2.0',
'gevent',
'tornado<6',
'tornado',
],
':python_version == "2.7"': ['futures'],
},
)
10 changes: 5 additions & 5 deletions testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Alternatively, due to the organization of the suite, it's possible to run direct

## Tested frameworks

Currently the examples cover `threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.
Currently the examples cover `threading`, `tornado`, `gevent`, `asyncio` (which requires Python 3) and `contextvars` (which requires Python 3.7 and higher). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.

### threading, asyncio and gevent

Expand All @@ -30,6 +30,10 @@ No automatic `Span` propagation between parent and children tasks is provided, a

Currently, yielding over multiple children is not supported, as the context is effectively shared, and switching from coroutine to coroutine messes up the current active `Span`.

### contextvars

`ContextVarsScopeManager` uses [contextvars](https://docs.python.org/3/library/contextvars.html) module to both store **and** automatically propagate the context from parent coroutines / tasks / scheduled in event loop callbacks to their children.

## List of patterns

- [Active Span replacement](test_active_span_replacement) - Start an isolated task and query for its results in another task/thread.
Expand All @@ -54,7 +58,3 @@ testbed/
```

Supporting all the platforms is optional, and a warning will be displayed when doing `make testbed` in such case.

## Flake8 support

Currently `flake8` does not support the Python 3 `await`/`async` syntax, and does not offer a way to ignore such syntax.
1 change: 0 additions & 1 deletion testbed/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

14 changes: 12 additions & 2 deletions testbed/__main__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
from importlib import import_module
import logging
import os
import sys
import six
import unittest
from tornado import version_info as tornado_version


enabled_platforms = [
'threads',
'tornado',
'gevent',
]
if tornado_version < (6, 0, 0, 0):
# Including testbed for Tornado coroutines and stack context.
# We don't need run testbed in case Tornado>=6, because it became
# asyncio-based framework and `stack_context` was deprecated.
enabled_platforms.append('tornado')
if six.PY3:
enabled_platforms.append('asyncio')
if sys.version_info >= (3, 7):
enabled_platforms.append('contextvars')

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__package__)
Expand Down Expand Up @@ -47,4 +55,6 @@ def get_test_directories():
suite = loader.loadTestsFromModule(test_module)
main_suite.addTests(suite)

unittest.TextTestRunner(verbosity=3).run(main_suite)
result = unittest.TextTestRunner(verbosity=3).run(main_suite)
if result.failures or result.errors:
sys.exit(1)
54 changes: 54 additions & 0 deletions testbed/test_active_span_replacement/test_contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import print_function

import asyncio

from opentracing.mocktracer import MockTracer
from ..testcase import OpenTracingTestCase
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
from ..utils import stop_loop_when


class TestAsyncioContextVars(OpenTracingTestCase):
def setUp(self):
self.tracer = MockTracer(ContextVarsScopeManager())
self.loop = asyncio.get_event_loop()

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span('initial')
self.submit_another_task(span)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 3)
self.loop.run_forever()

initial, subtask, task = self.tracer.finished_spans()

self.assertEmptySpan(initial, 'initial')
self.assertEmptySpan(subtask, 'subtask')
self.assertEmptySpan(task, 'task')

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(subtask, task)
self.assertIsChildOf(subtask, task)

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(initial, subtask)
self.assertHasNoParent(initial)

async def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span('task'):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span('subtask'):
pass

def submit_another_task(self, span):
self.loop.create_task(self.task(span))
79 changes: 79 additions & 0 deletions testbed/test_client_server/test_contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import print_function


import asyncio

import opentracing
from opentracing.ext import tags
from opentracing.mocktracer import MockTracer
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
from ..testcase import OpenTracingTestCase
from ..utils import get_logger, get_one_by_tag, stop_loop_when


logger = get_logger(__name__)


class Server(object):
def __init__(self, *args, **kwargs):
tracer = kwargs.pop('tracer')
queue = kwargs.pop('queue')
super(Server, self).__init__(*args, **kwargs)

self.tracer = tracer
self.queue = queue

async def run(self):
value = await self.queue.get()
self.process(value)

def process(self, message):
logger.info('Processing message in server')

ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
with self.tracer.start_active_span('receive',
child_of=ctx) as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)


class Client(object):
def __init__(self, tracer, queue):
self.tracer = tracer
self.queue = queue

async def send(self):
with self.tracer.start_active_span('send') as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(scope.span.context,
opentracing.Format.TEXT_MAP,
message)
await self.queue.put(message)

logger.info('Sent message from client')


class TestAsyncioContextVars(OpenTracingTestCase):
def setUp(self):
self.tracer = MockTracer(ContextVarsScopeManager())
self.queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
self.server = Server(tracer=self.tracer, queue=self.queue)

def test(self):
client = Client(self.tracer, self.queue)
self.loop.create_task(self.server.run())
self.loop.create_task(client.send())

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 2)
self.loop.run_forever()

spans = self.tracer.finished_spans()
self.assertIsNotNone(get_one_by_tag(spans,
tags.SPAN_KIND,
tags.SPAN_KIND_RPC_SERVER))
self.assertIsNotNone(get_one_by_tag(spans,
tags.SPAN_KIND,
tags.SPAN_KIND_RPC_CLIENT))
4 changes: 2 additions & 2 deletions testbed/test_common_request_handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This example shows a `Span` used with `RequestHandler`, which is used as a middl
Implementation details:
- For `threading`, no active `Span` is consumed as the tasks may be run concurrently on different threads, and an explicit `SpanContext` has to be saved to be used as parent.
- For `gevent` and `asyncio`, as no automatic `Span` propagation is done, an explicit `Span` has to be saved to be used as parent (observe an instrumentation library could help to do that implicitly - we stick to the simplest case, though).
- For `tornado`, as the `StackContext` automatically propapates the context (even is the tasks are called through different coroutines), we **do** leverage the active `Span`.
- For `tornado` and `contextvars`, as parent `Span` propagates automatically (even if the tasks are called through different coroutines), we **do** leverage the active `Span`.


RequestHandler implementation:
Expand All @@ -20,6 +20,6 @@ RequestHandler implementation:
child_of=self.context,
ignore_active_span=True)
else:
# Used by tornado.
# Used by tornado and contextvars.
span = self.tracer.start_span('send')
```
2 changes: 0 additions & 2 deletions testbed/test_common_request_handler/test_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import print_function

import functools

import asyncio

from opentracing.ext import tags
Expand Down
Loading

0 comments on commit 170f927

Please sign in to comment.