Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Comments and typing for _update_outliers_txn #11776

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11776.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some comments and type annotations for `_update_outliers_txn`.
35 changes: 23 additions & 12 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,28 +1254,32 @@ def _update_room_depths_txn(
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)

def _update_outliers_txn(self, txn, events_and_contexts):
def _update_outliers_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> List[Tuple[EventBase, EventContext]]:
"""Update any outliers with new event info.
This turns outliers into ex-outliers (unless the new event was
rejected).
This turns outliers into ex-outliers (unless the new event was rejected), and
also removes any other events we have already seen from the list.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
txn: db connection
events_and_contexts: events we are persisting
Returns:
list[(EventBase, EventContext)] new list, without events which
are already in the events table.
new list, without events which are already in the events table.
"""
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
% (",".join(["?"] * len(events_and_contexts)),),
[event.event_id for event, _ in events_and_contexts],
)

have_persisted = {event_id: outlier for event_id, outlier in txn}
have_persisted: Dict[str, bool] = {
event_id: outlier for event_id, outlier in txn
}

to_remove = set()
for event, context in events_and_contexts:
Expand All @@ -1285,15 +1289,22 @@ def _update_outliers_txn(self, txn, events_and_contexts):
to_remove.add(event)

if context.rejected:
# If the event is rejected then we don't care if the event
# was an outlier or not.
# If the incoming event is rejected then we don't care if the event
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that
# an outlier in the database. We now have some state at that event
# so we need to update the state_groups table with that state.
#
# Note that we do not update the stream_ordering of the event in this
# scenario. XXX: does this cause bugs? It will mean we won't send such
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)

# insert into event_to_state_groups.
try:
Expand Down