Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update get_pdu to return the original, pristine EventBase #13320

Merged
merged 22 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ee236ca
Update get_pdu to return original, pristine EventBase
MadLittleMods Jul 18, 2022
79a1b72
Add changelog
MadLittleMods Jul 18, 2022
bfd35fd
Internal change, no specific bugfix
MadLittleMods Jul 18, 2022
e0e20a5
Explain why not
MadLittleMods Jul 18, 2022
22410f2
Add tests
MadLittleMods Jul 19, 2022
09c411b
Some more clarity
MadLittleMods Jul 19, 2022
6029b42
Re-use room ID
MadLittleMods Jul 19, 2022
09167b1
Better still actionable no-fluff assertion message
MadLittleMods Jul 19, 2022
eb6a291
Describe why we use a cache here
MadLittleMods Jul 19, 2022
1c4e57c
Remove direct access to internal property
MadLittleMods Jul 19, 2022
488f5ed
Make it obvious that we're pulling and using a different cache
MadLittleMods Jul 19, 2022
29a5269
Remove assumption/speculation
MadLittleMods Jul 19, 2022
2688e44
Default is already no metadata
MadLittleMods Jul 19, 2022
24913e7
Refactor structure to avoid duplicating the event copy logic
MadLittleMods Jul 19, 2022
0e6dd5a
Pluralization typo
MadLittleMods Jul 19, 2022
5bc75ed
Explain that we return a copy that is safe to modify
MadLittleMods Jul 19, 2022
dea7669
Fix lints
MadLittleMods Jul 19, 2022
72e65a5
Fix description typo
MadLittleMods Jul 19, 2022
86fe0dc
Share event throughout
MadLittleMods Jul 20, 2022
fd879bb
Different comment
MadLittleMods Jul 20, 2022
354678f
Merge branch 'madlittlemods/pristine-get_pdu' of github.com:matrix-or…
MadLittleMods Jul 20, 2022
233077c
Merge branch 'develop' into madlittlemods/pristine-get_pdu
MadLittleMods Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13320.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `FederationClient.get_pdu()` returning events from the cache as `outliers` instead of original events we saw over federation.
128 changes: 86 additions & 42 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
RoomVersion,
RoomVersions,
)
from synapse.events import EventBase, builder
from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
Expand Down Expand Up @@ -299,7 +299,8 @@ async def get_pdu_from_destination_raw(
moving to the next destination. None indicates no timeout.

Returns:
The requested PDU, or None if we were unable to find it.
A copy of the requested PDU that is safe to modify, or None if we
were unable to find it.

Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
Expand All @@ -309,7 +310,7 @@ async def get_pdu_from_destination_raw(
)

logger.debug(
"retrieved event id %s from %s: %r",
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
Expand Down Expand Up @@ -358,54 +359,97 @@ async def get_pdu(
The requested PDU, or None if we were unable to find it.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a suggestion for a somewhat different approach. It's arguably cleaner, but also probably a bunch more work; I'd be interested in your thoughts.

In my ideal world, _EventInternalMetadata.outlier should be a private, immutable property. Which is to say, you have to know whether you are dealing with an outlier at the time you construct an _EventInternalMetadata and hence when you call make_event_from_dict. We're some way off that goal in the codebase as a whole, but that doesn't stop us thinking about how to move in that direction for this part of the code.

-- #13320 (comment)

I like the idea of having it be immutable 👍

It would mean passing an outlier flag into get_pdu telling it whether we're going to persist the result as an outlier or not. (That's fairly easy, because get_pdu is called in two places: FederationEventHandler.get_event, which only deals in outliers, and FederationClient._check_sigs_and_hash_and_fetch_one, where we can infer the outlieryness from the input pdu).

That then raises a few possibilities: we could continue to construct a new EventBase on each call to get_pdu as proposed here. Or we could use a different cache key depending on the outlier flag and have (up to) two cache entries for each event.

But needing to pass in outlier or the metadata (when we need to add more than just outlier in the future) makes the function signature surface a bit icky. I'd rather just force downstream consumers to create a copy with the metadata they need/expect, or want to add.

We can tackle this in another potential PR though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there's definitely a balance of two evils here. I'm happy to punt this for now at least.


# TODO: Rate limit the number of times we try and get the same event.
logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations
)

ev = self._get_pdu_cache.get(event_id)
if ev:
return ev
# TODO: Rate limit the number of times we try and get the same event.

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
# We might need the same event multiple times in quick succession (before
# it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches
# events once they have been persisted.
event_from_cache = self._get_pdu_cache.get(event_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations
event_from_remote = None
if not event_from_cache:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
destination,
last_attempt,
PDU_RETRY_TIME_MS,
now,
)
continue

try:
event_from_remote = await self.get_pdu_from_destination_raw(
destination=destination,
event_id=event_id,
room_version=room_version,
timeout=timeout,
)

signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
continue
pdu_attempts[destination] = now

try:
signed_pdu = await self.get_pdu_from_destination_raw(
destination=destination,
event_id=event_id,
room_version=room_version,
timeout=timeout,
)
if event_from_remote:
# Prime the cache
self._get_pdu_cache[
event_from_remote.event_id
] = event_from_remote

pdu_attempts[destination] = now
# FIXME: We should add a `break` here to avoid calling every
# destination after we already found a PDU (will follow-up
# in a separate PR)

except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now
except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id,
destination,
e,
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now

logger.info(
"Failed to get PDU %s from %s because %s",
event_id,
destination,
e,
)
continue

logger.info(
"Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
event = event_from_cache or event_from_remote
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if not event:
return None

if signed_pdu:
self._get_pdu_cache[event_id] = signed_pdu
# Make sure to return a copy because downstream callers will use this
# event reference directly and change our original, pristine, untouched
# PDU. For example when people mark the event as an `outlier`
# (`event.internal_metadata.outlier = true`), we don't want that to
# propagate back into the cache.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
event_copy = make_event_from_dict(
event.get_pdu_json(),
event.room_version,
)

return signed_pdu
return event_copy

async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
Expand Down
22 changes: 18 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,24 @@ async def _process_pulled_event(
"""
logger.info("Processing pulled event %s", event)

# these should not be outliers.
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"
# This function should not be used to persist outliers (use something
# else) because this does a bunch of operations that aren't necessary
# (extra work; in particular, it makes sure we have all the prev_events
# and resolves the state across those prev events). If you happen to run
# into a situation where the event you're trying to process/backfill is
# marked as an `outlier`, then you should update that spot to return an
# `EventBase` copy that doesn't have `outlier` flag set.
#
# `EventBase` is used to represent both an event we have not yet
# persisted, and one that we have persisted and now keep in the cache.
# In an ideal world this method would only be called with the first type
# of event, but it turns out that's not actually the case and for
# example, you could get an event from cache that is marked as an
# `outlier` (fix up that spot though).
assert not event.internal_metadata.is_outlier(), (
"Outlier event passed to _process_pulled_event. "
"To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`."
)

event_id = event.event_id

Expand Down
23 changes: 20 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,24 @@ def _update_outliers_txn(
event_id: outlier for event_id, outlier in txn
}

logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
[ev.event_id for ev, _ in events_and_contexts],
have_persisted,
)

to_remove = set()
for event, context in events_and_contexts:
if event.event_id not in have_persisted:
outlier_persisted = have_persisted.get(event.event_id)
logger.debug(
"_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
event.event_id,
event.internal_metadata.is_outlier(),
outlier_persisted,
)

# Ignore events which we haven't persisted at all
if outlier_persisted is None:
continue

to_remove.add(event)
Expand All @@ -1358,7 +1373,6 @@ def _update_outliers_txn(
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that event
Expand All @@ -1369,7 +1383,10 @@ def _update_outliers_txn(
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
event.event_id,
)

# insert into event_to_state_groups.
try:
Expand Down
Loading