Skip to content

Pass leave from remote invite rejection down Sliding Sync #18375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18375.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass leave from remote invite rejection down Sliding Sync.
15 changes: 15 additions & 0 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async def handle_room(room_id: str) -> None:
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
newly_left=room_id in interested_rooms.newly_left_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)

Expand Down Expand Up @@ -542,6 +543,7 @@ async def get_room_sync_data(
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
newly_left: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Expand All @@ -559,6 +561,7 @@ async def get_room_sync_data(
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
newly_left: If the user has newly left the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
Expand Down Expand Up @@ -856,6 +859,18 @@ async def get_room_sync_data(
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?

if newly_left and room_membership_for_user_at_to_token.event_id is not None:
membership_changed = True
leave_event = await self.store.get_event(
room_membership_for_user_at_to_token.event_id
)
state_key = leave_event.get_state_key()
if state_key is not None:
room_state_delta_id_map[(leave_event.type, state_key)] = (
room_membership_for_user_at_to_token.event_id
)

deltas = await self.get_current_state_deltas_for_room(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
Expand Down
116 changes: 13 additions & 103 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,115 +1107,25 @@ async def _get_newly_joined_and_left_rooms(
was state reset out of the room. To actually check for a state reset, you
need to check if a membership still exists in the room.
"""

newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}

# We need to figure out the
#
# - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
# - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)

# 1) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
current_state_delta_membership_changes_in_from_to_range = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_room_ids=self.rooms_to_exclude_globally,
)
)

# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership
] = {}
# We also want to assemble a list of the first membership events during the token
# range so we can step backward to the previous membership that would apply to
# before the token range to see if we have `newly_joined` the room.
first_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership
] = {}
# Keep track if the room has a non-join event in the token range so we can later
# tell if it was a `newly_joined` room. If the last membership event in the
# token range is a join and there is also some non-join in the range, we know
# they `newly_joined`.
has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
for (
membership_change
) in current_state_delta_membership_changes_in_from_to_range:
room_id = membership_change.room_id
if not from_token:
return newly_joined_room_ids, newly_left_room_map

last_membership_change_by_room_id_in_from_to_range[room_id] = (
membership_change
)
# Only set if we haven't already set it
first_membership_change_by_room_id_in_from_to_range.setdefault(
room_id, membership_change
)

if membership_change.membership != Membership.JOIN:
has_non_join_event_by_room_id_in_from_to_range[room_id] = True

# 1) Fixup
#
# 2) We also want to assemble a list of possibly newly joined rooms. Someone
# could have left and joined multiple times during the given range but we only
# care about whether they are joined at the end of the token range so we are
# working with the last membership even in the token range.
possibly_newly_joined_room_ids = set()
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id

# 2)
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)

# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# 1) Mark this room as `newly_left`
newly_left_room_map[room_id] = RoomsForUserStateReset(
room_id=room_id,
sender=last_membership_change_in_from_to_range.sender,
membership=Membership.LEAVE,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
room_version_id=await self.store.get_room_version_id(room_id),
)
changes = await self.store.get_sliding_sync_membership_changes(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_room_ids=self.rooms_to_exclude_globally,
)

# 2) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
has_non_join_in_from_to_range = (
has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
)
# If the last membership event in the token range is a join and there is
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
for room_id, entry in changes.items():
if entry.membership == Membership.JOIN:
newly_joined_room_ids.add(room_id)
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
].prev_event_id
prev_membership = first_membership_change_by_room_id_in_from_to_range[
room_id
].prev_membership

if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
newly_joined_room_ids.add(room_id)
# Last resort, we need to step back to the previous membership event
# just before the token range to see if we're joined then or not.
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
newly_joined_room_ids.add(room_id)
elif entry.membership == Membership.LEAVE:
newly_left_room_map[room_id] = entry

return newly_joined_room_ids, newly_left_room_map

Expand Down
167 changes: 167 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.roommember import RoomsForUserStateReset
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached, cachedList
Expand Down Expand Up @@ -1136,6 +1137,172 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
if membership_change.room_id not in room_ids_to_exclude
]

@trace
async def get_sliding_sync_membership_changes(
self,
user_id: str,
from_key: RoomStreamToken,
to_key: RoomStreamToken,
excluded_room_ids: Optional[List[str]] = None,
) -> Dict[str, RoomsForUserStateReset]:
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return {}

if from_key:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream)
)
if not has_changed:
return {}

room_ids_to_exclude: AbstractSet[str] = set()
if excluded_room_ids is not None:
room_ids_to_exclude = set(excluded_room_ids)

def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()

# This query looks at membership changes in
# `sliding_sync_membership_snapshots`. These will not include where
# users get state reset out of rooms, so we need to look for that
# case in `current_state_delta_stream`.
#
# TODO: Add an index a better index on sliding_sync_membership_snapshots
# probably want:
# - sliding_sync_membership_snapshots (user_id, event_stream_ordering)
# replacing the existing index on only (user_id)
# - MAYBE current_state_delta_stream(state_key, stream_id) WHERE type = 'm.room.member' AND event_id IS NULL
sql = """
SELECT
room_id,
membership_event_id,
event_instance_name,
event_stream_ordering,
membership,
sender,
prev_membership,
room_version
FROM
(
SELECT
s.room_id,
s.membership_event_id,
s.event_instance_name,
s.event_stream_ordering,
s.membership,
s.sender,
m_prev.membership AS prev_membership
FROM sliding_sync_membership_snapshots as s
LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
WHERE s.user_id = ?

UNION ALL

SELECT
s.room_id,
e.event_id,
s.instance_name,
s.stream_id,
m.membership,
e.sender,
m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
WHERE
s.type = ?
AND s.state_key = ?
) AS c
INNER JOIN rooms USING (room_id)
WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
ORDER BY event_stream_ordering ASC
"""

txn.execute(
sql,
(user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
)

membership_changes: Dict[str, RoomsForUserStateReset] = {}
for (
room_id,
membership_event_id,
event_instance_name,
event_stream_ordering,
membership,
sender,
prev_membership,
room_version_id,
) in txn:
assert room_id is not None
assert event_stream_ordering is not None

if room_id in room_ids_to_exclude:
continue

if _filter_results_by_stream(
from_key,
to_key,
event_instance_name,
event_stream_ordering,
):
# When the server leaves a room, it will insert new rows into the
# `current_state_delta_stream` table with `event_id = null` for all
# current state. This means we might already have a row for the
# leave event and then another for the same leave where the
# `event_id=null` but the `prev_event_id` is pointing back at the
# earlier leave event. We don't want to report the leave, if we
# already have a leave event.
if (
membership_event_id is None
and prev_membership == Membership.LEAVE
):
continue

if membership_event_id is None and room_id in membership_changes:
# SUSPICIOUS: if we join a room and get state reset out of it
# in the same queried window,
# won't this ignore the 'state reset out of it' part?
continue

if membership is None:
membership = Membership.LEAVE

if (
membership == Membership.JOIN
and prev_membership == Membership.JOIN
):
# The user was previously joined so this it not a new join.
# This happens when the user changes their display name.
continue

membership_change = RoomsForUserStateReset(
room_id=room_id,
sender=sender,
membership=membership,
event_id=membership_event_id,
event_pos=PersistedEventPosition(
event_instance_name, event_stream_ordering
),
room_version_id=room_version_id,
)

membership_changes[room_id] = membership_change

return membership_changes

membership_changes = await self.db_pool.runInteraction(
"get_sliding_sync_membership_changes", f
)

return membership_changes

@cancellable
async def get_membership_changes_for_user(
self,
Expand Down
Loading