Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit abf0e75

Browse files
committed
Delete current state when server leaves a room (#6792)
* commit '611215a49': Delete current state when server leaves a room (#6792)
2 parents 718337e + 611215a commit abf0e75

File tree

3 files changed

+198
-75
lines changed

3 files changed

+198
-75
lines changed

changelog.d/6792.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Delete current state from the database when server leaves a room.

synapse/storage/data_stores/main/events.py

+111-72
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import synapse.metrics
3333
from synapse.api.constants import EventContentFields, EventTypes
3434
from synapse.api.errors import SynapseError
35+
from synapse.api.room_versions import RoomVersions
3536
from synapse.events import EventBase # noqa: F401
3637
from synapse.events.snapshot import EventContext # noqa: F401
3738
from synapse.events.utils import prune_event_dict
@@ -468,84 +469,93 @@ def _update_current_state_txn(
468469
to_delete = delta_state.to_delete
469470
to_insert = delta_state.to_insert
470471

471-
# First we add entries to the current_state_delta_stream. We
472-
# do this before updating the current_state_events table so
473-
# that we can use it to calculate the `prev_event_id`. (This
474-
# allows us to not have to pull out the existing state
475-
# unnecessarily).
476-
#
477-
# The stream_id for the update is chosen to be the minimum of the stream_ids
478-
# for the batch of the events that we are persisting; that means we do not
479-
# end up in a situation where workers see events before the
480-
# current_state_delta updates.
481-
#
482-
sql = """
483-
INSERT INTO current_state_delta_stream
484-
(stream_id, room_id, type, state_key, event_id, prev_event_id)
485-
SELECT ?, ?, ?, ?, ?, (
486-
SELECT event_id FROM current_state_events
487-
WHERE room_id = ? AND type = ? AND state_key = ?
472+
if delta_state.no_longer_in_room:
473+
# Server is no longer in the room so we delete the room from
474+
# current_state_events, being careful we've already updated the
475+
# rooms.room_version column (which gets populated in a
476+
# background task).
477+
self._upsert_room_version_txn(txn, room_id)
478+
479+
# Before deleting we populate the current_state_delta_stream
480+
# so that async background tasks get told what happened.
481+
sql = """
482+
INSERT INTO current_state_delta_stream
483+
(stream_id, room_id, type, state_key, event_id, prev_event_id)
484+
SELECT ?, room_id, type, state_key, null, event_id
485+
FROM current_state_events
486+
WHERE room_id = ?
487+
"""
488+
txn.execute(sql, (stream_id, room_id))
489+
490+
self.db.simple_delete_txn(
491+
txn, table="current_state_events", keyvalues={"room_id": room_id},
488492
)
489-
"""
490-
txn.executemany(
491-
sql,
492-
(
493-
(
494-
stream_id,
495-
room_id,
496-
etype,
497-
state_key,
498-
None,
499-
room_id,
500-
etype,
501-
state_key,
493+
else:
494+
# We're still in the room, so we update the current state as normal.
495+
496+
# First we add entries to the current_state_delta_stream. We
497+
# do this before updating the current_state_events table so
498+
# that we can use it to calculate the `prev_event_id`. (This
499+
# allows us to not have to pull out the existing state
500+
# unnecessarily).
501+
#
502+
# The stream_id for the update is chosen to be the minimum of the stream_ids
503+
# for the batch of the events that we are persisting; that means we do not
504+
# end up in a situation where workers see events before the
505+
# current_state_delta updates.
506+
#
507+
sql = """
508+
INSERT INTO current_state_delta_stream
509+
(stream_id, room_id, type, state_key, event_id, prev_event_id)
510+
SELECT ?, ?, ?, ?, ?, (
511+
SELECT event_id FROM current_state_events
512+
WHERE room_id = ? AND type = ? AND state_key = ?
502513
)
503-
for etype, state_key in to_delete
504-
# We sanity check that we're deleting rather than updating
505-
if (etype, state_key) not in to_insert
506-
),
507-
)
508-
txn.executemany(
509-
sql,
510-
(
514+
"""
515+
txn.executemany(
516+
sql,
511517
(
512-
stream_id,
513-
room_id,
514-
etype,
515-
state_key,
516-
ev_id,
517-
room_id,
518-
etype,
519-
state_key,
520-
)
521-
for (etype, state_key), ev_id in iteritems(to_insert)
522-
),
523-
)
518+
(
519+
stream_id,
520+
room_id,
521+
etype,
522+
state_key,
523+
to_insert.get((etype, state_key)),
524+
room_id,
525+
etype,
526+
state_key,
527+
)
528+
for etype, state_key in itertools.chain(to_delete, to_insert)
529+
),
530+
)
531+
# Now we actually update the current_state_events table
524532

525-
# Now we actually update the current_state_events table
533+
txn.executemany(
534+
"DELETE FROM current_state_events"
535+
" WHERE room_id = ? AND type = ? AND state_key = ?",
536+
(
537+
(room_id, etype, state_key)
538+
for etype, state_key in itertools.chain(to_delete, to_insert)
539+
),
540+
)
526541

527-
txn.executemany(
528-
"DELETE FROM current_state_events"
529-
" WHERE room_id = ? AND type = ? AND state_key = ?",
530-
(
531-
(room_id, etype, state_key)
532-
for etype, state_key in itertools.chain(to_delete, to_insert)
533-
),
534-
)
542+
# We include the membership in the current state table, hence we do
543+
# a lookup when we insert. This assumes that all events have already
544+
# been inserted into room_memberships.
545+
txn.executemany(
546+
"""INSERT INTO current_state_events
547+
(room_id, type, state_key, event_id, membership)
548+
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
549+
""",
550+
[
551+
(room_id, key[0], key[1], ev_id, ev_id)
552+
for key, ev_id in iteritems(to_insert)
553+
],
554+
)
535555

536-
# We include the membership in the current state table, hence we do
537-
# a lookup when we insert. This assumes that all events have already
538-
# been inserted into room_memberships.
539-
txn.executemany(
540-
"""INSERT INTO current_state_events
541-
(room_id, type, state_key, event_id, membership)
542-
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
543-
""",
544-
[
545-
(room_id, key[0], key[1], ev_id, ev_id)
546-
for key, ev_id in iteritems(to_insert)
547-
],
548-
)
556+
# We now update `local_current_membership`. We do this regardless
557+
# of whether we're still in the room or not to handle the case where
558+
# e.g. we just got banned (where we need to record that fact here).
549559

550560
# Note: Do we really want to delete rows here (that we do not
551561
# subsequently reinsert below)? While technically correct it means
@@ -601,6 +611,35 @@ def _update_current_state_txn(
601611

602612
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
603613

614+
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
615+
"""Update the room version in the database based off current state
616+
events.
617+
618+
This is used when we're about to delete current state and we want to
619+
ensure that the `rooms.room_version` column is up to date.
620+
"""
621+
622+
sql = """
623+
SELECT json FROM event_json
624+
INNER JOIN current_state_events USING (room_id, event_id)
625+
WHERE room_id = ? AND type = ? AND state_key = ?
626+
"""
627+
txn.execute(sql, (room_id, EventTypes.Create, ""))
628+
row = txn.fetchone()
629+
if row:
630+
event_json = json.loads(row[0])
631+
content = event_json.get("content", {})
632+
creator = content.get("creator")
633+
room_version_id = content.get("room_version", RoomVersions.V1.identifier)
634+
635+
self.db.simple_upsert_txn(
636+
txn,
637+
table="rooms",
638+
keyvalues={"room_id": room_id},
639+
values={"room_version": room_version_id},
640+
insertion_values={"is_public": False, "creator": creator},
641+
)
642+
604643
def _update_forward_extremities_txn(
605644
self, txn, new_forward_extremities, max_stream_order
606645
):

synapse/storage/persist_events.py

+86-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717

18+
import itertools
1819
import logging
1920
from collections import deque, namedtuple
2021
from typing import Iterable, List, Optional, Tuple
@@ -27,7 +28,7 @@
2728

2829
from twisted.internet import defer
2930

30-
from synapse.api.constants import EventTypes
31+
from synapse.api.constants import EventTypes, Membership
3132
from synapse.events import FrozenEvent
3233
from synapse.events.snapshot import EventContext
3334
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@
7273
)
7374

7475

75-
@attr.s(slots=True, frozen=True)
76+
@attr.s(slots=True)
7677
class DeltaState:
7778
"""Deltas to use to update the `current_state_events` table.
7879
7980
Attributes:
8081
to_delete: List of type/state_keys to delete from current state
8182
to_insert: Map of state to upsert into current state
83+
no_longer_in_room: The server is not longer in the room, so the room
84+
should e.g. be removed from `current_state_events` table.
8285
"""
8386

8487
to_delete = attr.ib(type=List[Tuple[str, str]])
8588
to_insert = attr.ib(type=StateMap[str])
89+
no_longer_in_room = attr.ib(type=bool, default=False)
8690

8791

8892
class _EventPeristenceQueue(object):
@@ -396,18 +400,35 @@ async def _persist_events(
396400
# If either are not None then there has been a change,
397401
# and we need to work out the delta (or use that
398402
# given)
403+
delta = None
399404
if delta_ids is not None:
400405
# If there is a delta we know that we've
401406
# only added or replaced state, never
402407
# removed keys entirely.
403-
state_delta_for_room[room_id] = DeltaState([], delta_ids)
408+
delta = DeltaState([], delta_ids)
404409
elif current_state is not None:
405410
with Measure(
406411
self._clock, "persist_events.calculate_state_delta"
407412
):
408413
delta = await self._calculate_state_delta(
409414
room_id, current_state
410415
)
416+
417+
if delta:
418+
# If we have a change of state then lets check
419+
# whether we're actually still a member of the room,
420+
# or if our last user left. If we're no longer in
421+
# the room then we delete the current state and
422+
# extremities.
423+
is_still_joined = await self._is_server_still_joined(
424+
room_id, ev_ctx_rm, delta, current_state
425+
)
426+
if not is_still_joined:
427+
logger.info("Server no longer in room %s", room_id)
428+
latest_event_ids = []
429+
current_state = {}
430+
delta.no_longer_in_room = True
431+
411432
state_delta_for_room[room_id] = delta
412433

413434
# If we have the current_state then lets prefill
@@ -660,3 +681,65 @@ async def _calculate_state_delta(
660681
}
661682

662683
return DeltaState(to_delete=to_delete, to_insert=to_insert)
684+
685+
async def _is_server_still_joined(
686+
self,
687+
room_id: str,
688+
ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
689+
delta: DeltaState,
690+
current_state: Optional[StateMap[str]],
691+
) -> bool:
692+
"""Check if the server will still be joined after the given events have
693+
been persised.
694+
695+
Args:
696+
room_id
697+
ev_ctx_rm
698+
delta: The delta of current state between what is in the database
699+
and what the new current state will be.
700+
current_state: The new current state if it already been calculated,
701+
otherwise None.
702+
"""
703+
704+
if not any(
705+
self.is_mine_id(state_key)
706+
for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
707+
if typ == EventTypes.Member
708+
):
709+
# There have been no changes to membership of our users, so nothing
710+
# has changed and we assume we're still in the room.
711+
return True
712+
713+
# Check if any of the given events are a local join that appear in the
714+
# current state
715+
for (typ, state_key), event_id in delta.to_insert.items():
716+
if typ != EventTypes.Member or not self.is_mine_id(state_key):
717+
continue
718+
719+
for event, _ in ev_ctx_rm:
720+
if event_id == event.event_id:
721+
if event.membership == Membership.JOIN:
722+
return True
723+
724+
# There's been a change of membership but we don't have a local join
725+
# event in the new events, so we need to check the full state.
726+
if current_state is None:
727+
current_state = await self.main_store.get_current_state_ids(room_id)
728+
current_state = dict(current_state)
729+
for key in delta.to_delete:
730+
current_state.pop(key, None)
731+
732+
current_state.update(delta.to_insert)
733+
734+
event_ids = [
735+
event_id
736+
for (typ, state_key,), event_id in current_state.items()
737+
if typ == EventTypes.Member and self.is_mine_id(state_key)
738+
]
739+
740+
rows = await self.main_store.get_membership_from_event_ids(event_ids)
741+
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
742+
if is_still_joined:
743+
return True
744+
else:
745+
return False

0 commit comments

Comments
 (0)