Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8de5e05

Browse files
committed
Take out lock while persisting events / deleting room
1 parent b965651 commit 8de5e05

File tree

8 files changed

+112
-39
lines changed

8 files changed

+112
-39
lines changed

synapse/federation/federation_server.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
)
6464
from synapse.federation.persistence import TransactionActions
6565
from synapse.federation.units import Edu, Transaction
66+
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
6667
from synapse.http.servlet import assert_params_in_dict
6768
from synapse.logging.context import (
6869
make_deferred_yieldable,
@@ -137,6 +138,7 @@ def __init__(self, hs: "HomeServer"):
137138
self._event_auth_handler = hs.get_event_auth_handler()
138139
self._room_member_handler = hs.get_room_member_handler()
139140
self._e2e_keys_handler = hs.get_e2e_keys_handler()
141+
self._worker_lock_handler = hs.get_worker_locks_handler()
140142

141143
self._state_storage_controller = hs.get_storage_controllers().state
142144

@@ -1236,9 +1238,18 @@ async def _process_incoming_pdus_in_room_inner(
12361238
logger.info("handling received PDU in room %s: %s", room_id, event)
12371239
try:
12381240
with nested_logging_context(event.event_id):
1239-
await self._federation_event_handler.on_receive_pdu(
1240-
origin, event
1241-
)
1241+
# We're taking out a lock within a lock, which could
1242+
# lead to deadlocks if we're not careful. However, it is
1243+
# safe on this occasion as we only ever take a write
1244+
# lock when deleting a room, which we would never do
1245+
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
1246+
# lock.
1247+
async with self._worker_lock_handler.acquire_read_write_lock(
1248+
DELETE_ROOM_LOCK_NAME, room_id, write=False
1249+
):
1250+
await self._federation_event_handler.on_receive_pdu(
1251+
origin, event
1252+
)
12421253
except FederationError as e:
12431254
# XXX: Ideally we'd inform the remote we failed to process
12441255
# the event, but we can't return an error in the transaction

synapse/handlers/message.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
5454
from synapse.events.validator import EventValidator
5555
from synapse.handlers.directory import DirectoryHandler
56+
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
5657
from synapse.logging import opentracing
5758
from synapse.logging.context import make_deferred_yieldable, run_in_background
5859
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -485,6 +486,7 @@ def __init__(self, hs: "HomeServer"):
485486
self._events_shard_config = self.config.worker.events_shard_config
486487
self._instance_name = hs.get_instance_name()
487488
self._notifier = hs.get_notifier()
489+
self._worker_lock_handler = hs.get_worker_locks_handler()
488490

489491
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
490492

@@ -1010,6 +1012,37 @@ async def create_and_send_nonmember_event(
10101012
event.internal_metadata.stream_ordering,
10111013
)
10121014

1015+
async with self._worker_lock_handler.acquire_read_write_lock(
1016+
DELETE_ROOM_LOCK_NAME, room_id, write=False
1017+
):
1018+
return await self._create_and_send_nonmember_event(
1019+
requester=requester,
1020+
event_dict=event_dict,
1021+
allow_no_prev_events=allow_no_prev_events,
1022+
prev_event_ids=prev_event_ids,
1023+
state_event_ids=state_event_ids,
1024+
ratelimit=ratelimit,
1025+
txn_id=txn_id,
1026+
ignore_shadow_ban=ignore_shadow_ban,
1027+
outlier=outlier,
1028+
depth=depth,
1029+
)
1030+
1031+
async def _create_and_send_nonmember_event(
1032+
self,
1033+
requester: Requester,
1034+
event_dict: dict,
1035+
allow_no_prev_events: bool = False,
1036+
prev_event_ids: Optional[List[str]] = None,
1037+
state_event_ids: Optional[List[str]] = None,
1038+
ratelimit: bool = True,
1039+
txn_id: Optional[str] = None,
1040+
ignore_shadow_ban: bool = False,
1041+
outlier: bool = False,
1042+
depth: Optional[int] = None,
1043+
) -> Tuple[EventBase, int]:
1044+
room_id = event_dict["room_id"]
1045+
10131046
# If we don't have any prev event IDs specified then we need to
10141047
# check that the host is in the room (as otherwise populating the
10151048
# prev events will fail), at which point we may as well check the
@@ -1924,7 +1957,10 @@ async def _send_dummy_events_to_fill_extremities(self) -> None:
19241957
)
19251958

19261959
for room_id in room_ids:
1927-
dummy_event_sent = await self._send_dummy_event_for_room(room_id)
1960+
async with self._worker_lock_handler.acquire_read_write_lock(
1961+
DELETE_ROOM_LOCK_NAME, room_id, write=False
1962+
):
1963+
dummy_event_sent = await self._send_dummy_event_for_room(room_id)
19281964

19291965
if not dummy_event_sent:
19301966
# Did not find a valid user in the room, so remove from future attempts

synapse/handlers/pagination.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848

4949
PURGE_HISTORY_LOCK_NAME = "purge_history_lock"
5050

51+
DELETE_ROOM_LOCK_NAME = "delete_room_lock"
52+
5153

5254
@attr.s(slots=True, auto_attribs=True)
5355
class PurgeStatus:
@@ -418,8 +420,9 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
418420
room_id: room to be purged
419421
force: set true to skip checking for joined users.
420422
"""
421-
async with self._worker_locks.acquire_read_write_lock(
422-
PURGE_HISTORY_LOCK_NAME, room_id, write=True
423+
async with self._worker_locks.acquire_multi_read_write_lock(
424+
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
425+
write=True,
423426
):
424427
# first check that we have no users in this room
425428
if not force:

synapse/handlers/room_member.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from synapse.events.snapshot import EventContext
4040
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
4141
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
42+
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
4243
from synapse.logging import opentracing
4344
from synapse.metrics import event_processing_positions
4445
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -94,6 +95,7 @@ def __init__(self, hs: "HomeServer"):
9495
self.event_creation_handler = hs.get_event_creation_handler()
9596
self.account_data_handler = hs.get_account_data_handler()
9697
self.event_auth_handler = hs.get_event_auth_handler()
98+
self._worker_lock_handler = hs.get_worker_locks_handler()
9799

98100
self.member_linearizer: Linearizer = Linearizer(name="member")
99101
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -638,26 +640,29 @@ async def update_membership(
638640
# by application services), and then by room ID.
639641
async with self.member_as_limiter.queue(as_id):
640642
async with self.member_linearizer.queue(key):
641-
with opentracing.start_active_span("update_membership_locked"):
642-
result = await self.update_membership_locked(
643-
requester,
644-
target,
645-
room_id,
646-
action,
647-
txn_id=txn_id,
648-
remote_room_hosts=remote_room_hosts,
649-
third_party_signed=third_party_signed,
650-
ratelimit=ratelimit,
651-
content=content,
652-
new_room=new_room,
653-
require_consent=require_consent,
654-
outlier=outlier,
655-
allow_no_prev_events=allow_no_prev_events,
656-
prev_event_ids=prev_event_ids,
657-
state_event_ids=state_event_ids,
658-
depth=depth,
659-
origin_server_ts=origin_server_ts,
660-
)
643+
async with self._worker_lock_handler.acquire_read_write_lock(
644+
DELETE_ROOM_LOCK_NAME, room_id, write=False
645+
):
646+
with opentracing.start_active_span("update_membership_locked"):
647+
result = await self.update_membership_locked(
648+
requester,
649+
target,
650+
room_id,
651+
action,
652+
txn_id=txn_id,
653+
remote_room_hosts=remote_room_hosts,
654+
third_party_signed=third_party_signed,
655+
ratelimit=ratelimit,
656+
content=content,
657+
new_room=new_room,
658+
require_consent=require_consent,
659+
outlier=outlier,
660+
allow_no_prev_events=allow_no_prev_events,
661+
prev_event_ids=prev_event_ids,
662+
state_event_ids=state_event_ids,
663+
depth=depth,
664+
origin_server_ts=origin_server_ts,
665+
)
661666

662667
return result
663668

synapse/handlers/worker_lock.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
from synapse.server import HomeServer
4343

4444

45+
DELETE_ROOM_LOCK_NAME = "delete_room_lock"
46+
47+
4548
class WorkerLocksHandler:
4649
"""A class for waiting on taking out locks, rather than using the storage
4750
functions directly (which don't support awaiting).

synapse/replication/tcp/handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ def __init__(self, hs: "HomeServer"):
249249
if self._is_master or self._should_insert_client_ips:
250250
self.subscribe_to_channel("USER_IP")
251251

252-
self._notifier.add_lock_released_callback(self.on_lock_released)
252+
if hs.config.redis.redis_enabled:
253+
self._notifier.add_lock_released_callback(self.on_lock_released)
253254

254255
def subscribe_to_channel(self, channel_name: str) -> None:
255256
"""

synapse/rest/client/room_upgrade_rest_servlet.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from synapse.api.errors import Codes, ShadowBanError, SynapseError
1919
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
20+
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
2021
from synapse.http.server import HttpServer
2122
from synapse.http.servlet import (
2223
RestServlet,
@@ -60,6 +61,7 @@ def __init__(self, hs: "HomeServer"):
6061
self._hs = hs
6162
self._room_creation_handler = hs.get_room_creation_handler()
6263
self._auth = hs.get_auth()
64+
self._worker_lock_handler = hs.get_worker_locks_handler()
6365

6466
async def on_POST(
6567
self, request: SynapseRequest, room_id: str
@@ -78,9 +80,12 @@ async def on_POST(
7880
)
7981

8082
try:
81-
new_room_id = await self._room_creation_handler.upgrade_room(
82-
requester, room_id, new_version
83-
)
83+
async with self._worker_lock_handler.acquire_read_write_lock(
84+
DELETE_ROOM_LOCK_NAME, room_id, write=False
85+
):
86+
new_room_id = await self._room_creation_handler.upgrade_room(
87+
requester, room_id, new_version
88+
)
8489
except ShadowBanError:
8590
# Generate a random room ID.
8691
new_room_id = stringutils.random_string(18)

synapse/storage/controllers/persist_events.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from synapse.api.constants import EventTypes, Membership
4646
from synapse.events import EventBase
4747
from synapse.events.snapshot import EventContext
48+
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
4849
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
4950
from synapse.logging.opentracing import (
5051
SynapseTags,
@@ -338,6 +339,7 @@ def __init__(
338339
)
339340
self._state_resolution_handler = hs.get_state_resolution_handler()
340341
self._state_controller = state_controller
342+
self.hs = hs
341343

342344
async def _process_event_persist_queue_task(
343345
self,
@@ -350,15 +352,22 @@ async def _process_event_persist_queue_task(
350352
A dictionary of event ID to event ID we didn't persist as we already
351353
had another event persisted with the same TXN ID.
352354
"""
353-
if isinstance(task, _PersistEventsTask):
354-
return await self._persist_event_batch(room_id, task)
355-
elif isinstance(task, _UpdateCurrentStateTask):
356-
await self._update_current_state(room_id, task)
357-
return {}
358-
else:
359-
raise AssertionError(
360-
f"Found an unexpected task type in event persistence queue: {task}"
361-
)
355+
356+
# Ensure that the room can't be deleted while we're persisting events to
357+
# it. We might already have taken out the lock, but since this is just a
358+
# "read" lock its inherently reentrant.
359+
async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
360+
DELETE_ROOM_LOCK_NAME, room_id, write=False
361+
):
362+
if isinstance(task, _PersistEventsTask):
363+
return await self._persist_event_batch(room_id, task)
364+
elif isinstance(task, _UpdateCurrentStateTask):
365+
await self._update_current_state(room_id, task)
366+
return {}
367+
else:
368+
raise AssertionError(
369+
f"Found an unexpected task type in event persistence queue: {task}"
370+
)
362371

363372
@trace
364373
async def persist_events(

0 commit comments

Comments
 (0)