-
Notifications
You must be signed in to change notification settings - Fork 2
Optimise get rooms for user sync part 2 #11
Changes from all commits
ecc0c02
75231d9
adbcc82
47ebb6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1298,36 +1298,89 @@ async def generate_sync_result( | |
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` | ||
instance to signify that the sync calculation is complete. | ||
""" | ||
|
||
user_id = sync_config.user.to_string() | ||
app_service = self.store.get_app_service_by_user_id(user_id) | ||
if app_service: | ||
# We no longer support AS users using /sync directly. | ||
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
|
||
# Note: we get the users room list *before* we get the current token, this | ||
# avoids checking back in history if rooms are joined after the token is fetched. | ||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) | ||
|
||
# NB: The now_token gets changed by some of the generate_sync_* methods, | ||
# this is due to some of the underlying streams not supporting the ability | ||
# to query up to a given point. | ||
# Always use the `now_token` in `SyncResultBuilder` | ||
now_token = self.event_sources.get_current_token() | ||
log_kv({"now_token": now_token}) | ||
|
||
# Since we fetched the users room list before the token, there's a small window | ||
# during which membership events may have been persisted, so we fetch these now | ||
# and modify the joined room list for any changes between the get_rooms_for_user | ||
# call and the get_current_token call. | ||
membership_change_events = [] | ||
if since_token: | ||
membership_change_events = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude | ||
) | ||
|
||
mem_last_change_by_room_id: Dict[str, EventBase] = {} | ||
for event in membership_change_events: | ||
mem_last_change_by_room_id[event.room_id] = event | ||
|
||
# For the latest membership event in each room found, add/remove the room ID | ||
# from the joined room list accordingly. In this case we only care if the | ||
# latest change is JOIN. | ||
|
||
for room_id, event in mem_last_change_by_room_id.items(): | ||
logger.info( | ||
"User membership change between getting rooms and current token: %s %s %s", | ||
user_id, | ||
event.membership, | ||
room_id, | ||
) | ||
# User joined a room - we have to then check the room state to ensure we | ||
# respect any bans if there's a race between the join and ban events. | ||
if event.membership == Membership.JOIN: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to be careful that we handle a I think we could look at the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, no we don't need to worry about join -> join transitions. I was thinking of a different un-applicable scenario.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was just trying to figure this out but I don't think Pulling in the events would solve this, but at that point it's essentially the same as the current |
||
assert event.internal_metadata.stream_ordering | ||
extrems = await self.store.get_forward_extremities_for_room_at_stream_ordering( | ||
room_id, event.internal_metadata.stream_ordering | ||
) | ||
user_ids_in_room = await self.state.get_current_user_ids_in_room( | ||
room_id, extrems | ||
) | ||
if user_id in user_ids_in_room: | ||
mutable_joined_room_ids.add(room_id) | ||
# The user left the room, or left and was re-invited but not joined yet | ||
else: | ||
mutable_joined_room_ids.discard(room_id) | ||
|
||
# Now we have our list of joined room IDs, exclude as configured and freeze | ||
joined_room_ids = frozenset( | ||
( | ||
room_id | ||
for room_id in mutable_joined_room_ids | ||
if room_id not in self.rooms_to_exclude | ||
) | ||
) | ||
|
||
logger.debug( | ||
"Calculating sync response for %r between %s and %s", | ||
sync_config.user, | ||
since_token, | ||
now_token, | ||
) | ||
|
||
user_id = sync_config.user.to_string() | ||
app_service = self.store.get_app_service_by_user_id(user_id) | ||
if app_service: | ||
# We no longer support AS users using /sync directly. | ||
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
else: | ||
joined_room_ids = await self.get_rooms_for_user_at( | ||
user_id, now_token.room_key | ||
) | ||
sync_result_builder = SyncResultBuilder( | ||
sync_config, | ||
full_state, | ||
since_token=since_token, | ||
now_token=now_token, | ||
joined_room_ids=joined_room_ids, | ||
membership_change_events=membership_change_events, | ||
) | ||
|
||
logger.debug("Fetching account data") | ||
|
@@ -1808,19 +1861,12 @@ async def _have_rooms_changed( | |
|
||
Does not modify the `sync_result_builder`. | ||
""" | ||
user_id = sync_result_builder.sync_config.user.to_string() | ||
since_token = sync_result_builder.since_token | ||
now_token = sync_result_builder.now_token | ||
membership_change_events = sync_result_builder.membership_change_events | ||
|
||
assert since_token | ||
|
||
# Get a list of membership change events that have happened to the user | ||
# requesting the sync. | ||
membership_changes = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key | ||
) | ||
|
||
if membership_changes: | ||
if membership_change_events: | ||
return True | ||
|
||
stream_id = since_token.room_key.stream | ||
|
@@ -1859,16 +1905,10 @@ async def _get_rooms_changed( | |
since_token = sync_result_builder.since_token | ||
now_token = sync_result_builder.now_token | ||
sync_config = sync_result_builder.sync_config | ||
membership_change_events = sync_result_builder.membership_change_events | ||
|
||
assert since_token | ||
|
||
# TODO: we've already called this function and ran this query in | ||
# _have_rooms_changed. We could keep the results in memory to avoid a | ||
# second query, at the cost of more complicated source code. | ||
membership_change_events = await self.store.get_membership_changes_for_user( | ||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude | ||
) | ||
|
||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} | ||
for event in membership_change_events: | ||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) | ||
|
@@ -2369,60 +2409,6 @@ async def _generate_room_entry( | |
else: | ||
raise Exception("Unrecognized rtype: %r", room_builder.rtype) | ||
|
||
async def get_rooms_for_user_at( | ||
self, | ||
user_id: str, | ||
room_key: RoomStreamToken, | ||
) -> FrozenSet[str]: | ||
"""Get set of joined rooms for a user at the given stream ordering. | ||
|
||
The stream ordering *must* be recent, otherwise this may throw an | ||
exception if older than a month. (This function is called with the | ||
current token, which should be perfectly fine). | ||
|
||
Args: | ||
user_id | ||
stream_ordering | ||
|
||
ReturnValue: | ||
Set of room_ids the user is in at given stream_ordering. | ||
""" | ||
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) | ||
|
||
joined_room_ids = set() | ||
|
||
# We need to check that the stream ordering of the join for each room | ||
# is before the stream_ordering asked for. This might not be the case | ||
# if the user joins a room between us getting the current token and | ||
# calling `get_rooms_for_user_with_stream_ordering`. | ||
# If the membership's stream ordering is after the given stream | ||
# ordering, we need to go and work out if the user was in the room | ||
# before. | ||
# We also need to check whether the room should be excluded from sync | ||
# responses as per the homeserver config. | ||
for joined_room in joined_rooms: | ||
if joined_room.room_id in self.rooms_to_exclude: | ||
continue | ||
|
||
if not joined_room.event_pos.persisted_after(room_key): | ||
joined_room_ids.add(joined_room.room_id) | ||
continue | ||
|
||
logger.info("User joined room after current token: %s", joined_room.room_id) | ||
|
||
extrems = ( | ||
await self.store.get_forward_extremities_for_room_at_stream_ordering( | ||
joined_room.room_id, joined_room.event_pos.stream | ||
) | ||
) | ||
user_ids_in_room = await self.state.get_current_user_ids_in_room( | ||
joined_room.room_id, extrems | ||
) | ||
if user_id in user_ids_in_room: | ||
joined_room_ids.add(joined_room.room_id) | ||
|
||
return frozenset(joined_room_ids) | ||
|
||
|
||
def _action_has_highlight(actions: List[JsonDict]) -> bool: | ||
for action in actions: | ||
|
@@ -2519,6 +2505,7 @@ class SyncResultBuilder: | |
since_token: Optional[StreamToken] | ||
now_token: StreamToken | ||
joined_room_ids: FrozenSet[str] | ||
membership_change_events: List[EventBase] | ||
|
||
presence: List[UserPresenceState] = attr.Factory(list) | ||
account_data: List[JsonDict] = attr.Factory(list) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't
since_token
significantly older than we need? Can we take the current token before we fetchget_rooms_for_user
and then only get the changes between the two?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes much older - if we switch to using deltas as above this would be far more efficient.
In the current state of this PR though it makes sense to call
get_membership_changes_for_user
between thesince_token
& now because we later fetch this anyway (twice), so passing it around as part of theSyncResultBuilder
removes those duplicate lookups.