Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix a bug introduced in Synapse v1.50.0rc1 whereby outbound federation could fail because too many EDUs were produced for device updates. #11730

Merged
merged 17 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def get_devices_by_auth_provider_session_id(
@trace
async def get_device_updates_by_remote(
self, destination: str, from_stream_id: int, limit: int
) -> Tuple[int, List[Tuple[str, dict]]]:
) -> Tuple[int, List[Tuple[str, JsonDict]]]:
"""Get a stream of device updates to send to the given remote server.

Args:
Expand Down Expand Up @@ -222,7 +222,7 @@ async def get_device_updates_by_remote(
limit,
)

# We need to ensure `updates` doesn't grow too big.
# We need to ensure `updates` doesn't grow too big.
# Currently: `len(updates) <= limit`.

# Return an empty list if there are no updates
Expand Down Expand Up @@ -334,8 +334,13 @@ async def get_device_updates_by_remote(
if update_stream_id > previous_update_stream_id:
query_map[key] = (update_stream_id, update_context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here? Could we have problems if the same key occurs twice in updates? (Ahh, looks like that'll never happen because of how it's sourced from the DB?)

Copy link
Contributor Author

@reivilibre reivilibre Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's intentional: later ones overwrite old ones (hence this if-block)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just worried that this might throw away a previous update_context, which sounds bad. But we're not changing that soo... maybe we should just leave it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's 'just' the opentracing span. Maybe I'll put a FIXME note in there about it; not sure what the right answer is at this instant — sounds like it could make it hard to trace things down which is not great, but it's not release blocking considering it was always like this ...


# As this update has been added to the response, advance the stream
# position.
last_processed_stream_id = update_stream_id

# In the worst case scenario, each update is for a distinct user and is
# added either to the query_map or to cross_signing_keys_by_user,
# but not both:
# len(query_map) + len(cross_signing_keys_by_user) <= len(updates) here,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
# so len(query_map) + len(cross_signing_keys_by_user) <= limit.

Expand Down
27 changes: 22 additions & 5 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ def test_get_device_updates_by_remote_can_limit_properly(self):
# Check the newly-added device_ids are contained within these updates
self._check_devices_in_updates(device_ids, device_updates)

# Check there are no more device updates left.
_, device_updates = self.get_success(
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
)
self.assertEqual(device_updates, [])

def test_get_device_updates_by_remote_cross_signing_key_updates(
self,
) -> None:
Expand Down Expand Up @@ -223,11 +229,16 @@ def test_get_device_updates_by_remote_cross_signing_key_updates(
self.store.get_device_updates_by_remote("somehost", -1, limit=3)
)

# Even though the limit has not been hit, the next update will be duplicated
# but there's no space for it here.
# Here we expect the device updates for `device_id1` and `device_id2`.
# That means we only receive 2 updates this time around.
# If we had a higher limit, we would expect to see the pair of
# (unstable-prefixed & unprefixed) signing key updates for the device
# represented by `fakeMaster` and `fakeSelfSigning`.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
# Our implementation only sends these two variants together, so we get
# a short batch.
self.assertEqual(len(device_updates), 2, device_updates)

# Check the first two devices came out.
# Check the first two devices (device_id1, device_id2) came out.
self._check_devices_in_updates(device_ids[:2], device_updates)

# Get more device updates meant for this remote
Expand All @@ -237,8 +248,8 @@ def test_get_device_updates_by_remote_cross_signing_key_updates(

# The next 2 updates should be a cross-signing key update
# (the master key update and the self-signing key update are combined into
# one, but the cross-signing key update is emitted twice, once with an unprefixed
# type and one with an unstable-prefixed type)
# one 'signing key update', but the cross-signing key update is emitted
# twice, once with an unprefixed type and once again with an unstable-prefixed type)
# (This is a temporary arrangement for backwards compatibility!)
self.assertEqual(len(device_updates), 2, device_updates)
self.assertEqual(
Expand All @@ -248,6 +259,12 @@ def test_get_device_updates_by_remote_cross_signing_key_updates(
device_updates[1][0], "org.matrix.signing_key_update", device_updates[1]
)

# Check there are no more device updates left.
_, device_updates = self.get_success(
self.store.get_device_updates_by_remote("somehost", next_stream_id, limit=3)
)
self.assertEqual(device_updates, [])

def _check_devices_in_updates(self, expected_device_ids, device_updates):
"""Check that an specific device ids exist in a list of device update EDUs"""
self.assertEqual(len(device_updates), len(expected_device_ids))
Expand Down