Skip to content

Commit

Permalink
Fixed synchronization primitives not being released on native asyncio…
Browse files Browse the repository at this point in the history
… cancel during __aenter__() (#399)

Fixes #398.
  • Loading branch information
agronholm authored Dec 14, 2021
1 parent cb5b64c commit 4419414
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Version history

This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

**UNRELEASED**

- Fixed deadlock in synchronization primitives on asyncio which can happen if a task acquiring a
primitive is hit with a native (not AnyIO) cancellation with just the right timing, leaving the
next acquiring task waiting forever (`#398 <https://github.com/agronholm/anyio/issues/398>`_)

**3.4.0**

- Added context propagation to/from worker threads in ``to_thread.run_sync()``,
Expand Down
6 changes: 5 additions & 1 deletion src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,11 @@ async def acquire_on_behalf_of(self, borrower: object) -> None:

self._borrowers.add(borrower)
else:
await cancel_shielded_checkpoint()
try:
await cancel_shielded_checkpoint()
except BaseException:
self.release()
raise

def release(self) -> None:
self.release_on_behalf_of(current_task())
Expand Down
12 changes: 10 additions & 2 deletions src/anyio/_core/_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ async def acquire(self) -> None:

assert self._owner_task == task
else:
await cancel_shielded_checkpoint()
try:
await cancel_shielded_checkpoint()
except BaseException:
self.release()
raise

def acquire_nowait(self) -> None:
"""
Expand Down Expand Up @@ -309,7 +313,11 @@ async def acquire(self) -> None:

raise
else:
await cancel_shielded_checkpoint()
try:
await cancel_shielded_checkpoint()
except BaseException:
self.release()
raise

def acquire_nowait(self) -> None:
"""
Expand Down
49 changes: 49 additions & 0 deletions tests/test_synchronization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Optional

import pytest
Expand Down Expand Up @@ -114,6 +115,22 @@ async def waiter() -> None:
assert not lock.statistics().locked
assert lock.statistics().tasks_waiting == 0

@pytest.mark.parametrize('anyio_backend', ['asyncio'])
async def test_asyncio_deadlock(self) -> None:
"""Regression test for #398."""
lock = Lock()

async def acquire() -> None:
async with lock:
await asyncio.sleep(0)

loop = asyncio.get_event_loop()
task1 = loop.create_task(acquire())
task2 = loop.create_task(acquire())
await asyncio.sleep(0)
task1.cancel()
await asyncio.wait_for(task2, 1)


class TestEvent:
async def test_event(self) -> None:
Expand Down Expand Up @@ -363,6 +380,22 @@ async def test_acquire_race(self) -> None:
semaphore.release()
pytest.raises(WouldBlock, semaphore.acquire_nowait)

@pytest.mark.parametrize('anyio_backend', ['asyncio'])
async def test_asyncio_deadlock(self) -> None:
"""Regression test for #398."""
semaphore = Semaphore(1)

async def acquire() -> None:
async with semaphore:
await asyncio.sleep(0)

loop = asyncio.get_event_loop()
task1 = loop.create_task(acquire())
task2 = loop.create_task(acquire())
await asyncio.sleep(0)
task1.cancel()
await asyncio.wait_for(task2, 1)


class TestCapacityLimiter:
async def test_bad_init_type(self) -> None:
Expand Down Expand Up @@ -465,3 +498,19 @@ async def waiter() -> None:

assert limiter.statistics().tasks_waiting == 0
assert limiter.statistics().borrowed_tokens == 0

@pytest.mark.parametrize('anyio_backend', ['asyncio'])
async def test_asyncio_deadlock(self) -> None:
"""Regression test for #398."""
limiter = CapacityLimiter(1)

async def acquire() -> None:
async with limiter:
await asyncio.sleep(0)

loop = asyncio.get_event_loop()
task1 = loop.create_task(acquire())
task2 = loop.create_task(acquire())
await asyncio.sleep(0)
task1.cancel()
await asyncio.wait_for(task2, 1)

0 comments on commit 4419414

Please sign in to comment.