Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nooz/324.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Only set `._debug.Lock.local_pdb_complete` if has been created.

This can be triggered by a very rare race condition (and thus we have no
working test yet) but it is known to exist in (a) consumer project(s).
3 changes: 2 additions & 1 deletion tractor/_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ def release(cls):
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
cls.local_pdb_complete.set()
if cls.local_pdb_complete is not None:
cls.local_pdb_complete.set()
finally:
# restore original sigint handler
cls.unshield_sigint()
Expand Down
29 changes: 18 additions & 11 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def gather_contexts(
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.
entered and exited, and cancellation just works.

'''
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
Expand Down Expand Up @@ -119,7 +119,7 @@ class _Cache:
a kept-alive-while-in-use async resource.

'''
lock = trio.Lock()
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
Expand Down Expand Up @@ -165,13 +165,18 @@ async def maybe_open_context(
_Cached instance on a _Cache hit.

'''
# lock resource acquisition around task racing / ``trio``'s
# scheduler protocol
await _Cache.lock.acquire()

ctx_key = (id(acm_func), key or tuple(kwargs.items()))
fid = id(acm_func)
ctx_key = (fid, key or tuple(kwargs.items()))
value = None

# Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol.
# NOTE: the lock is target context manager func specific in order
# to allow re-entrant use cases where one `maybe_open_context()`
# wrapped factor may want to call into another.
lock = _Cache.locks.setdefault(fid, trio.Lock())
await lock.acquire()

try:
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
Expand All @@ -189,21 +194,21 @@ async def maybe_open_context(
# TODO: does this need to be a tractor "root nursery"?
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
ln, _ = resources[ctx_key] = (service_n, trio.Event())
resources[ctx_key] = (service_n, trio.Event())

value = await ln.start(
value = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
_Cache.lock.release()
lock.release()
yield False, value

else:
log.info(f'Reusing _Cached resource for {ctx_key}')
_Cache.users += 1
_Cache.lock.release()
lock.release()
yield True, value

finally:
Expand All @@ -221,3 +226,5 @@ async def maybe_open_context(
if entry:
_, no_more_users = entry
no_more_users.set()

_Cache.locks.pop(fid)