Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Optimise get rooms for user sync part 2 #11

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 67 additions & 80 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,36 +1298,89 @@ async def generate_sync_result(
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""

user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

# Since we fetched the users room list before the token, there's a small window
# during which membership events may have been persisted, so we fetch these now
# and modify the joined room list for any changes between the get_rooms_for_user
# call and the get_current_token call.
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't since_token significantly older than we need? Can we take the current token before we fetch get_rooms_for_user and then only get the changes between the two?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes much older - if we switch to using deltas as above this would be far more efficient.

In the current state of this PR though it makes sense to call get_membership_changes_for_user between the since_token & now because we later fetch this anyway (twice), so passing it around as part of the SyncResultBuilder removes those duplicate lookups.

)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events:
mem_last_change_by_room_id[event.room_id] = event

# For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the
# latest change is JOIN.

for room_id, event in mem_last_change_by_room_id.items():
logger.info(
"User membership change between getting rooms and current token: %s %s %s",
user_id,
event.membership,
room_id,
)
# User joined a room - we have to then check the room state to ensure we
# respect any bans if there's a race between the join and ban events.
if event.membership == Membership.JOIN:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be careful that we handle a join -> join transition? e.g. a display name update?

I think we could look at the current_state_deltas table to get a more accurate sense of how the state has changed?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no we don't need to worry about join -> join transitions. I was thinking of a different un-applicable scenario.

current_state_deltas probably still the right table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just trying to figure this out but I don't think current_state_deltas has all the information we need - specifically the membership field in event content means we can't determine whether an event was a join or invite.

Pulling in the events would solve this, but at that point it's essentially the same as the current get_membership_changes_for_user call I think, is there a reason this isn't suitable?

assert event.internal_metadata.stream_ordering
extrems = await self.store.get_forward_extremities_for_room_at_stream_ordering(
room_id, event.internal_metadata.stream_ordering
)
user_ids_in_room = await self.state.get_current_user_ids_in_room(
room_id, extrems
)
if user_id in user_ids_in_room:
mutable_joined_room_ids.add(room_id)
# The user left the room, or left and was re-invited but not joined yet
else:
mutable_joined_room_ids.discard(room_id)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
(
room_id
for room_id in mutable_joined_room_ids
if room_id not in self.rooms_to_exclude
)
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
now_token,
)

user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
membership_change_events=membership_change_events,
)

logger.debug("Fetching account data")
Expand Down Expand Up @@ -1808,19 +1861,12 @@ async def _have_rooms_changed(

Does not modify the `sync_result_builder`.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
membership_change_events = sync_result_builder.membership_change_events

assert since_token

# Get a list of membership change events that have happened to the user
# requesting the sync.
membership_changes = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

if membership_changes:
if membership_change_events:
return True

stream_id = since_token.room_key.stream
Expand Down Expand Up @@ -1859,16 +1905,10 @@ async def _get_rooms_changed(
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
membership_change_events = sync_result_builder.membership_change_events

assert since_token

# TODO: we've already called this function and ran this query in
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
)

mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
Expand Down Expand Up @@ -2369,60 +2409,6 @@ async def _generate_room_entry(
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self,
user_id: str,
room_key: RoomStreamToken,
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.

The stream ordering *must* be recent, otherwise this may throw an
exception if older than a month. (This function is called with the
current token, which should be perfectly fine).

Args:
user_id
stream_ordering

ReturnValue:
Set of room_ids the user is in at given stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)

joined_room_ids = set()

# We need to check that the stream ordering of the join for each room
# is before the stream_ordering asked for. This might not be the case
# if the user joins a room between us getting the current token and
# calling `get_rooms_for_user_with_stream_ordering`.
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
# We also need to check whether the room should be excluded from sync
# responses as per the homeserver config.
for joined_room in joined_rooms:
if joined_room.room_id in self.rooms_to_exclude:
continue

if not joined_room.event_pos.persisted_after(room_key):
joined_room_ids.add(joined_room.room_id)
continue

logger.info("User joined room after current token: %s", joined_room.room_id)

extrems = (
await self.store.get_forward_extremities_for_room_at_stream_ordering(
joined_room.room_id, joined_room.event_pos.stream
)
)
user_ids_in_room = await self.state.get_current_user_ids_in_room(
joined_room.room_id, extrems
)
if user_id in user_ids_in_room:
joined_room_ids.add(joined_room.room_id)

return frozenset(joined_room_ids)


def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
Expand Down Expand Up @@ -2519,6 +2505,7 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]

presence: List[UserPresenceState] = attr.Factory(list)
account_data: List[JsonDict] = attr.Factory(list)
Expand Down