Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

EventsPersistenceStorageController.persist_event is only used by test code #14039

Open
anoadragon453 opened this issue Oct 4, 2022 · 1 comment
Labels
O-Uncommon Most users are unlikely to come across this or unexpected workflow T-Task Refactoring, removal, replacement, enabling or disabling functionality, other engineering tasks. Z-Cleanup Things we want to get rid of, but aren't actively causing pain Z-Dev-Wishlist Makes developers' lives better, but doesn't have direct user impact Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution

Comments

@anoadragon453
Copy link
Member

The following method is only used by test code (albeit extensively):

@trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
"""
Returns:
The event, stream ordering of `event`, and the stream ordering of the
latest persisted event. The returned event may not match the given
event if it was deduplicated due to an existing event matching the
transaction ID.
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
# add_to_queue returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events.)
replaced_events = await self._event_persist_queue.add_to_queue(
event.room_id,
_PersistEventsTask(
events_and_contexts=[(event, context)], backfilled=backfilled
),
)
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now
assert event_stream_id
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()

Non-test code instead makes use of EventsPersistenceStorageController.persist_events:

@trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> Tuple[List[EventBase], RoomStreamToken]:
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: Whether the results are retrieved from federation
via backfill or not. Used to determine if they're "new" events
which might update the current state etc.
Returns:
List of events persisted, the current position room stream position.
The list of events persisted may not be the same as those passed in
if they were deduplicated due to an event already existing that
matched the transaction ID; the existing event is returned in such
a case.
Raises:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
event_ids: List[str] = []
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
event_ids.append(event.event_id)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str(event_ids),
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id,
_PersistEventsTask(events_and_contexts=evs_ctxs, backfilled=backfilled),
)
ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
# Each call to add_to_queue returns a map from event ID to existing event ID if
# the event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events).
#
# Since we use `yieldable_gather_results` we need to merge the returned list
# of dicts into one.
replaced_events: Dict[str, str] = {}
for d in ret_vals:
replaced_events.update(d)
persisted_events = []
for event, _ in events_and_contexts:
existing_event_id = replaced_events.get(event.event_id)
if existing_event_id:
persisted_events.append(
await self.main_store.get_event(existing_event_id)
)
else:
persisted_events.append(event)
return (
persisted_events,
self.main_store.get_room_max_token(),
)

It seems sensible to remove persist_event and migrate the test code to use the plural version of the method instead.

@anoadragon453 anoadragon453 added T-Task Refactoring, removal, replacement, enabling or disabling functionality, other engineering tasks. Z-Dev-Wishlist Makes developers' lives better, but doesn't have direct user impact O-Uncommon Most users are unlikely to come across this or unexpected workflow labels Oct 4, 2022
@squahtx
Copy link
Contributor

squahtx commented Oct 4, 2022

Note that https://github.com/matrix-org/synapse/pull/13800/files#diff-3d4a81fcc5b115c649ae67aaba0dd4dd0c5827bb001dfcb90b00ffb1605d518eL1840-L1842 removed the last non-test usage of persist_event

@DMRobertson DMRobertson added the Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution label Oct 4, 2022
@MadLittleMods MadLittleMods added the Z-Cleanup Things we want to get rid of, but aren't actively causing pain label Apr 25, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
O-Uncommon Most users are unlikely to come across this or unexpected workflow T-Task Refactoring, removal, replacement, enabling or disabling functionality, other engineering tasks. Z-Cleanup Things we want to get rid of, but aren't actively causing pain Z-Dev-Wishlist Makes developers' lives better, but doesn't have direct user impact Z-Help-Wanted We know exactly how to fix this issue, and would be grateful for any contribution
Projects
None yet
Development

No branches or pull requests

4 participants