Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate BlockingTrioPortal in favor of from_thread.run and from_thread.run_sync #1122

Merged
merged 12 commits into from
Jul 21, 2019
13 changes: 10 additions & 3 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@
Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition
)

from ._threads import (
run_sync_in_thread, current_default_thread_limiter, BlockingTrioPortal
)
from ._threads import (run_sync_in_thread, current_default_thread_limiter)
from ._threads import BlockingTrioPortal as _BlockingTrioPortal

from ._highlevel_generic import aclose_forcefully, StapledStream

Expand Down Expand Up @@ -70,6 +69,7 @@
from . import hazmat
from . import socket
from . import abc
from . import from_thread
# Not imported by default: testing
if False:
from . import testing
Expand Down Expand Up @@ -114,6 +114,13 @@
issue=810,
instead=current_default_thread_limiter,
),
"BlockingTrioPortal":
_deprecate.DeprecatedAttribute(
_BlockingTrioPortal,
"0.12.0",
issue=810,
instead=("trio.from_thread.run(), trio.from_thread.run_sync()")
),
}

# Having the public path in .__module__ attributes is important for:
Expand Down
122 changes: 122 additions & 0 deletions trio/_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ async def run_sync_in_thread(sync_fn, *args, cancellable=False, limiter=None):
tasks to continue working while ``sync_fn`` runs. This is accomplished by
pushing the call to ``sync_fn(*args)`` off into a worker thread.

``run_sync_in_thread`` also injects the current ``TrioToken`` into the
spawned thread's local storage so that these threads can re-enter the Trio
loop by calling either :func: ``trio.from_thread.run`` or
:func: ``trio.from_thread.run_sync`` for async or synchronous functions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReStrustructured Text format is a little weird: double-backticks, like ``trio.from_thread.run``, mean raw literal code – it's the equivalent of markdown's single-backticks. If you want to use :func: to link to a named function, then you have to use single-backticks: :func:`trio.from_thread.run`.

But, since #1091 was merged recently, we don't even need to write :func: most of the time – if you just plain single-backticks, like `trio.from_thread.run`, then sphinx will now automatically find the object and link to it – you don't have to use :func: to specify the type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint! Yeah, I have never used RST, only Markdown, and also never Sphinx either so the docs are a bit of a learning curve for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these double backticks still need to be replaced with single backticks.

respectively.

Args:
sync_fn: An arbitrary synchronous callable.
*args: Positional arguments to pass to sync_fn. If you need keyword
Expand Down Expand Up @@ -385,6 +391,7 @@ def worker_thread_fn():
thread = threading.Thread(
target=worker_thread_fn, name=name, daemon=True
)
setattr(thread, 'current_trio_token', trio.hazmat.current_trio_token())
epellis marked this conversation as resolved.
Show resolved Hide resolved
thread.start()
except:
limiter.release_on_behalf_of(placeholder)
Expand All @@ -398,3 +405,118 @@ def abort(_):
return trio.hazmat.Abort.FAILED

return await trio.hazmat.wait_task_rescheduled(abort)


def _run_fn_as_system_task(cb, fn, *args, trio_token=None):
"""Helper function for from_thread.run and from_thread.run_sync.

Since this internally uses TrioToken.run_sync_soon, all warnings about
raised exceptions canceling all tasks should be noted.

"""
if not trio_token:
current_thread = threading.current_thread()
trio_token = getattr(current_thread, 'current_trio_token')

try:
trio.hazmat.current_task()
except RuntimeError:
pass
else:
raise RuntimeError(
"this is a blocking function; call it from a thread"
)

q = stdlib_queue.Queue()
trio_token.run_sync_soon(cb, q, fn, args)
return q.get().unwrap()


def run(afn, *args, trio_token=None):
"""Run the given async function in the parent Trio thread, blocking until it
is complete.

Returns:
Whatever ``afn(*args)`` returns.

Returns or raises whatever the given function returns or raises. It
can also raise exceptions of its own:

Raises:
RunFinishedError: if the corresponding call to :func:`trio.run` has
already completed.
Cancelled: if the corresponding call to :func:`trio.run` completes
while ``afn(*args)`` is running, then ``afn`` is likely to raise
:class:`Cancelled`, and this will propagate out into
RuntimeError: if you try calling this from inside the Trio thread,
which would otherwise cause a deadlock.
AttributeError: if run()'s thread local storage does not have a token.
This happens when it was not spawned from trio.run_sync_in_thread.

**Locating a Trio Token**: There are two ways to specify which
:func: ``trio.run()`` loop to reenter::

- Spawn this thread from :func: ``run_sync_in_thread``. This will
"inject" the current Trio Token into thread local storage and allow
this function to re-enter the same :func: ``trio.run()`` loop.
- Pass a keyword argument, ``trio_token`` specifiying a specific
:func: ``trio.run()`` loop to re-enter. This is the "legacy" way of
re-entering a trio thread and is similar to the old
`BlockingTrioPortal`.
"""

def callback(q, afn, args):
@disable_ki_protection
async def unprotected_afn():
return await afn(*args)

async def await_in_trio_thread_task():
q.put_nowait(await outcome.acapture(unprotected_afn))

trio.hazmat.spawn_system_task(await_in_trio_thread_task, name=afn)

return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token)


def run_sync(fn, *args, trio_token=None):
"""Run the given sync function in the parent Trio thread, blocking until it
is complete.

Returns:
Whatever ``fn(*args)`` returns.

Returns or raises whatever the given function returns or raises. It
can also raise exceptions of its own:

Raises:
RunFinishedError: if the corresponding call to :func:`trio.run` has
already completed.
Cancelled: if the corresponding call to :func:`trio.run` completes
while ``afn(*args)`` is running, then ``afn`` is likely to raise
:class:`Cancelled`, and this will propagate out into
RuntimeError: if you try calling this from inside the Trio thread,
which would otherwise cause a deadlock.
AttributeError: if run()'s thread local storage does not have a token.
This happens when it was not spawned from trio.run_sync_in_thread.

**Locating a Trio Token**: There are two ways to specify which
:func: ``trio.run()`` loop to reenter::

- Spawn this thread from :func: ``run_sync_in_thread``. This will
"inject" the current Trio Token into thread local storage and allow
this function to re-enter the same :func: ``trio.run()`` loop.
- Pass a keyword argument, ``trio_token`` specifiying a specific
:func: ``trio.run()`` loop to re-enter. This is the "legacy" way of
re-entering a trio thread and is similar to the old
`BlockingTrioPortal`.
"""

def callback(q, fn, args):
@disable_ki_protection
def unprotected_fn():
return fn(*args)

res = outcome.capture(unprotected_fn)
q.put_nowait(res)

return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token)
6 changes: 6 additions & 0 deletions trio/from_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
This namespace represents special functions that can call back into Trio from
an external thread by means of a Trio Token present in Thread Local Storage
"""

from ._threads import (run_sync, run)
86 changes: 86 additions & 0 deletions trio/tests/test_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .. import Event, CapacityLimiter, sleep
from ..testing import wait_all_tasks_blocked
from .._threads import *
from .._threads import run, run_sync # Not in __all__, must import explicitly

from .._core.tests.test_ki import ki_self
from .._core.tests.tutil import slow
Expand Down Expand Up @@ -457,3 +458,88 @@ def bad_start(self):
assert "engines" in str(excinfo.value)

assert limiter.borrowed_tokens == 0


async def test_trio_run_sync_in_thread_token():
# Test that run_sync_in_thread automatically injects the current trio token
# into a spawned thread

def thread_fn():
current_thread = threading.current_thread()
callee_token = getattr(current_thread, 'current_trio_token')
return callee_token

caller_token = _core.current_trio_token()
callee_token = await run_sync_in_thread(thread_fn)
assert callee_token == caller_token


async def test_trio_from_thread_run_sync():
# Test that run_sync_in_thread correctly "hands off" the trio token to
# trio.from_thread.run_sync()
def thread_fn():
start = run_sync(_core.current_time)
end = run_sync(_core.current_time)
return end - start

duration = await run_sync_in_thread(thread_fn)
assert duration > 0
epellis marked this conversation as resolved.
Show resolved Hide resolved


async def test_trio_from_thread_run():
# Test that run_sync_in_thread correctly "hands off" the trio token to
# trio.from_thread.run()
def thread_fn():
start = time.perf_counter()
run(sleep, 0.05)
end = time.perf_counter()
return end - start

duration = await run_sync_in_thread(thread_fn)
assert duration > 0
epellis marked this conversation as resolved.
Show resolved Hide resolved


async def test_trio_from_thread_token():
# Test that run_sync_in_thread and spawned trio.from_thread.run_sync()
# share the same Trio token
def thread_fn():
callee_token = run_sync(_core.current_trio_token)
return callee_token

caller_token = _core.current_trio_token()
callee_token = await run_sync_in_thread(thread_fn)
assert callee_token == caller_token


async def test_trio_from_thread_token_kwarg():
# Test that run_sync_in_thread and spawned trio.from_thread.run_sync() can
# use an explicitly defined token
def thread_fn(token):
callee_token = run_sync(_core.current_trio_token, trio_token=token)
return callee_token

caller_token = _core.current_trio_token()
callee_token = await run_sync_in_thread(thread_fn, caller_token)
assert callee_token == caller_token


async def test_trio_from_thread_both_run():
# Test that trio.from_thread.run() and from_thread.run_sync() can run in
# the same thread together

def thread_fn():
start = run_sync(_core.current_time)
run(sleep, 0.05)
end = run_sync(_core.current_time)
return end - start

duration = await run_sync_in_thread(thread_fn)
assert duration > 0
epellis marked this conversation as resolved.
Show resolved Hide resolved


async def test_trio_from_thread_raw_call():
# Test that a "raw call" to trio.from_thread.run() fails because no token
# has been provided

with pytest.raises(AttributeError):
run_sync(_core.current_time)
epellis marked this conversation as resolved.
Show resolved Hide resolved