Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Block calls to get_current_state_ids and co. when have partial state
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed May 25, 2022
1 parent 98ec3bc commit 453af55
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ async def _sync_partial_state_room(
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self.storage.state.notify_room_un_partial_stated(room_id)

# TODO(faster_joins) update room stats and user directory?
return
Expand Down
13 changes: 13 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,19 @@ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None
keyvalues={"room_id": room_id},
)

async def is_room_got_partial_state(self, room_id: str) -> bool:
"Whether the given room only has partial state stored"

entry = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={"room_id": room_id},
retcol="room_id",
allow_none=True,
desc="is_room_got_partial_state",
)

return entry is not None


class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
Expand Down
19 changes: 18 additions & 1 deletion synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateKey, StateMap

if TYPE_CHECKING:
Expand Down Expand Up @@ -590,10 +593,18 @@ def __init__(self, hs: "HomeServer", stores: "Databases"):
self._is_mine_id = hs.is_mine_id
self.stores = stores
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)

def notify_event_un_partial_stated(self, event_id: str) -> None:
self._partial_state_events_tracker.notify_un_partial_stated(event_id)

def notify_room_un_partial_stated(self, room_id: str) -> None:
"""Notify that the room no longer has any partial state.
Must be called after `clear_partial_state_room`
"""
self._partial_state_room_tracker.notify_un_partial_stated(room_id)

async def get_state_group_delta(
self, state_group: int
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
Expand Down Expand Up @@ -911,6 +922,7 @@ async def get_current_state_ids(
Returns:
The current state of the room.
"""
await self._partial_state_room_tracker.await_full_state(room_id)

return await self.stores.main.get_partial_current_state_ids(
room_id, on_invalidate=on_invalidate
Expand All @@ -931,6 +943,9 @@ async def get_filtered_current_state_ids(
Returns:
Map from type/state_key to event ID.
"""
if not state_filter or state_filter.must_await_full_state(self._is_mine_id):
await self._partial_state_room_tracker.await_full_state(room_id)

return await self.stores.main.get_partial_filtered_current_state_ids(
room_id, state_filter
)
Expand Down Expand Up @@ -985,6 +1000,8 @@ async def get_current_state_deltas(
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
"""
# FIXME(faster room joins): what do we do here?

return await self.stores.main.get_partial_current_state_deltas(
prev_stream_id, max_stream_id
)
60 changes: 60 additions & 0 deletions synapse/storage/util/partial_state_events_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -118,3 +119,62 @@ async def await_full_state(self, event_ids: Collection[str]) -> None:
observer_set.discard(observer)
if not observer_set:
del self._observers[event_id]


class PartialCurrentStateTracker:
"""Keeps track of which rooms have partial state, after partial-state joins"""

def __init__(self, store: RoomWorkerStore):
self._store = store

# a map from room id to a set of Deferreds which are waiting for that room to be
# un-partial-stated.
self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)

def notify_un_partial_stated(self, room_id: str) -> None:
"""Notify that we now have full current state for a given room
Unblocks any callers to await_full_state() for that room.
Args:
room_id: the room that now has full current state.
"""
observers = self._observers.pop(room_id, None)
if not observers:
return
logger.info(
"Notifying %i things waiting for un-partial-stating of room %s",
len(observers),
room_id,
)
with PreserveLoggingContext():
for o in observers:
o.callback(None)

async def await_full_state(self, room_id: str) -> None:
# We add the deferred immediately so that the DB call to check for
# partial state doesn't race when we unpartial the room.
d = Deferred[None]()
self._observers.setdefault(room_id, set()).add(d)

try:
# Check if the room has partial current state or not.
has_partial_state = await self._store.is_room_got_partial_state(room_id)
if not has_partial_state:
return

logger.info(
"Awaiting un-partial-stating of room %s",
room_id,
)

await make_deferred_yieldable(d)

logger.info("Room has un-partial-stated")
finally:
# Remove the added observer, and remove the room entry if its empty.
ds = self._observers.get(room_id)
if ds is not None:
ds.discard(d)
if not ds:
self._observers.pop(room_id, None)

0 comments on commit 453af55

Please sign in to comment.