Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 605d161

Browse files
authored
Add cancellation support to ReadWriteLock (#12120)
Also convert `ReadWriteLock` to use async context managers. Signed-off-by: Sean Quah <seanq@element.io>
1 parent 8e5706d commit 605d161

File tree

4 files changed

+382
-93
lines changed

4 files changed

+382
-93
lines changed

changelog.d/12120.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for cancellation to `ReadWriteLock`.

synapse/handlers/pagination.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ async def _purge_history(
350350
"""
351351
self._purges_in_progress_by_room.add(room_id)
352352
try:
353-
with await self.pagination_lock.write(room_id):
353+
async with self.pagination_lock.write(room_id):
354354
await self.storage.purge_events.purge_history(
355355
room_id, token, delete_local_events
356356
)
@@ -406,7 +406,7 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
406406
room_id: room to be purged
407407
force: set true to skip checking for joined users.
408408
"""
409-
with await self.pagination_lock.write(room_id):
409+
async with self.pagination_lock.write(room_id):
410410
# first check that we have no users in this room
411411
if not force:
412412
joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -448,7 +448,7 @@ async def get_messages(
448448

449449
room_token = from_token.room_key
450450

451-
with await self.pagination_lock.read(room_id):
451+
async with self.pagination_lock.read(room_id):
452452
(
453453
membership,
454454
member_event_id,
@@ -615,7 +615,7 @@ async def _shutdown_and_purge_room(
615615

616616
self._purges_in_progress_by_room.add(room_id)
617617
try:
618-
with await self.pagination_lock.write(room_id):
618+
async with self.pagination_lock.write(room_id):
619619
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
620620
self._delete_by_id[
621621
delete_id

synapse/util/async_helpers.py

+40-31
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
import inspect
1919
import itertools
2020
import logging
21-
from contextlib import contextmanager
21+
from contextlib import asynccontextmanager, contextmanager
2222
from typing import (
2323
Any,
24+
AsyncIterator,
2425
Awaitable,
2526
Callable,
2627
Collection,
@@ -40,7 +41,7 @@
4041
)
4142

4243
import attr
43-
from typing_extensions import ContextManager, Literal
44+
from typing_extensions import AsyncContextManager, Literal
4445

4546
from twisted.internet import defer
4647
from twisted.internet.defer import CancelledError
@@ -491,7 +492,7 @@ class ReadWriteLock:
491492
492493
Example:
493494
494-
with await read_write_lock.read("test_key"):
495+
async with read_write_lock.read("test_key"):
495496
# do some work
496497
"""
497498

@@ -514,22 +515,24 @@ def __init__(self) -> None:
514515
# Latest writer queued
515516
self.key_to_current_writer: Dict[str, defer.Deferred] = {}
516517

517-
async def read(self, key: str) -> ContextManager:
518-
new_defer: "defer.Deferred[None]" = defer.Deferred()
518+
def read(self, key: str) -> AsyncContextManager:
519+
@asynccontextmanager
520+
async def _ctx_manager() -> AsyncIterator[None]:
521+
new_defer: "defer.Deferred[None]" = defer.Deferred()
519522

520-
curr_readers = self.key_to_current_readers.setdefault(key, set())
521-
curr_writer = self.key_to_current_writer.get(key, None)
523+
curr_readers = self.key_to_current_readers.setdefault(key, set())
524+
curr_writer = self.key_to_current_writer.get(key, None)
522525

523-
curr_readers.add(new_defer)
526+
curr_readers.add(new_defer)
524527

525-
# We wait for the latest writer to finish writing. We can safely ignore
526-
# any existing readers... as they're readers.
527-
if curr_writer:
528-
await make_deferred_yieldable(curr_writer)
529-
530-
@contextmanager
531-
def _ctx_manager() -> Iterator[None]:
532528
try:
529+
# We wait for the latest writer to finish writing. We can safely ignore
530+
# any existing readers... as they're readers.
531+
# May raise a `CancelledError` if the `Deferred` wrapping us is
532+
# cancelled. The `Deferred` we are waiting on must not be cancelled,
533+
# since we do not own it.
534+
if curr_writer:
535+
await make_deferred_yieldable(stop_cancellation(curr_writer))
533536
yield
534537
finally:
535538
with PreserveLoggingContext():
@@ -538,29 +541,35 @@ def _ctx_manager() -> Iterator[None]:
538541

539542
return _ctx_manager()
540543

541-
async def write(self, key: str) -> ContextManager:
542-
new_defer: "defer.Deferred[None]" = defer.Deferred()
544+
def write(self, key: str) -> AsyncContextManager:
545+
@asynccontextmanager
546+
async def _ctx_manager() -> AsyncIterator[None]:
547+
new_defer: "defer.Deferred[None]" = defer.Deferred()
543548

544-
curr_readers = self.key_to_current_readers.get(key, set())
545-
curr_writer = self.key_to_current_writer.get(key, None)
549+
curr_readers = self.key_to_current_readers.get(key, set())
550+
curr_writer = self.key_to_current_writer.get(key, None)
546551

547-
# We wait on all latest readers and writer.
548-
to_wait_on = list(curr_readers)
549-
if curr_writer:
550-
to_wait_on.append(curr_writer)
552+
# We wait on all latest readers and writer.
553+
to_wait_on = list(curr_readers)
554+
if curr_writer:
555+
to_wait_on.append(curr_writer)
551556

552-
# We can clear the list of current readers since the new writer waits
553-
# for them to finish.
554-
curr_readers.clear()
555-
self.key_to_current_writer[key] = new_defer
557+
# We can clear the list of current readers since `new_defer` waits
558+
# for them to finish.
559+
curr_readers.clear()
560+
self.key_to_current_writer[key] = new_defer
556561

557-
await make_deferred_yieldable(defer.gatherResults(to_wait_on))
558-
559-
@contextmanager
560-
def _ctx_manager() -> Iterator[None]:
562+
to_wait_on_defer = defer.gatherResults(to_wait_on)
561563
try:
564+
# Wait for all current readers and the latest writer to finish.
565+
# May raise a `CancelledError` immediately after the wait if the
566+
# `Deferred` wrapping us is cancelled. We must only release the lock
567+
# once we have acquired it, hence the use of `delay_cancellation`
568+
# rather than `stop_cancellation`.
569+
await make_deferred_yieldable(delay_cancellation(to_wait_on_defer))
562570
yield
563571
finally:
572+
# Release the lock.
564573
with PreserveLoggingContext():
565574
new_defer.callback(None)
566575
# `self.key_to_current_writer[key]` may be missing if there was another

0 commit comments

Comments
 (0)