-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
WIP: Dmr/unblock catchup #15228
WIP: Dmr/unblock catchup #15228
Changes from all commits
6b70d44
bebd7d2
c813f89
0aa0201
7f97783
1cb55e9
f139247
31f2b15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Temp changelog entry (TODO). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,17 @@ | |
| # limitations under the License. | ||
| import logging | ||
| from enum import Enum, auto | ||
| from typing import Collection, Dict, FrozenSet, List, Optional, Tuple | ||
| from typing import ( | ||
| Collection, | ||
| Dict, | ||
| FrozenSet, | ||
| List, | ||
| Mapping, | ||
| Optional, | ||
| Sequence, | ||
| Set, | ||
| Tuple, | ||
| ) | ||
|
|
||
| import attr | ||
| from typing_extensions import Final | ||
|
|
@@ -565,29 +575,40 @@ async def filter_events_for_server( | |
| storage: StorageControllers, | ||
| target_server_name: str, | ||
| local_server_name: str, | ||
| events: List[EventBase], | ||
| events: Sequence[EventBase], | ||
| redact: bool = True, | ||
| check_history_visibility_only: bool = False, | ||
| filter_out_erased_senders: bool = True, | ||
| ) -> List[EventBase]: | ||
| """Filter a list of events based on whether given server is allowed to | ||
| """Filter a list of events based on whether the target server is allowed to | ||
| see them. | ||
|
|
||
| For a fully stated room, the target server is allowed to see an event E if: | ||
| - the state at E has world readable or shared history vis, OR | ||
| - the state at E says that the target server is in the room. | ||
|
|
||
| For a partially stated room, the target server is allowed to see E if: | ||
| - E was created by this homeserver, AND: | ||
| - the partial state at E has world readable or shared history vis, OR | ||
| - the partial state at E says that the target server is in the room. | ||
|
|
||
| TODO: state before or state after? | ||
|
|
||
| Args: | ||
| storage | ||
| server_name | ||
| target_server_name | ||
| local_server_name | ||
| events | ||
| redact: Whether to return a redacted version of the event, or | ||
| to filter them out entirely. | ||
| check_history_visibility_only: Whether to only check the | ||
| history visibility, rather than things like if the sender has been | ||
| redact: Controls what to do with events which have been filtered out. | ||
| If True, include their redacted forms; if False, omit them entirely. | ||
| filter_out_erased_senders: If true, also filter out events whose sender has been | ||
| erased. This is used e.g. during pagination to decide whether to | ||
| backfill or not. | ||
|
|
||
| Returns | ||
| The filtered events. | ||
| """ | ||
|
|
||
| def is_sender_erased(event: EventBase, erased_senders: Dict[str, bool]) -> bool: | ||
| def is_sender_erased(event: EventBase, erased_senders: Mapping[str, bool]) -> bool: | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if erased_senders and erased_senders[event.sender]: | ||
| logger.info("Sender of %s has been erased, redacting", event.event_id) | ||
| return True | ||
|
|
@@ -616,7 +637,7 @@ def check_event_is_visible( | |
| # server has no users in the room: redact | ||
| return False | ||
|
|
||
| if not check_history_visibility_only: | ||
| if filter_out_erased_senders: | ||
| erased_senders = await storage.main.are_users_erased(e.sender for e in events) | ||
| else: | ||
| # We don't want to check whether users are erased, which is equivalent | ||
|
|
@@ -631,44 +652,49 @@ def check_event_is_visible( | |
| # otherwise a room could be fully joined after we retrieve those, which would then bypass | ||
| # this check but would base the filtering on an outdated view of the membership events. | ||
|
|
||
| partial_state_invisible_events = set() | ||
| if not check_history_visibility_only: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to some failing tests we need to keep a way to shortcut this code for the backfill use case. |
||
| for e in events: | ||
| sender_domain = get_domain_from_id(e.sender) | ||
| if ( | ||
| sender_domain != local_server_name | ||
| and await storage.main.is_partial_state_room(e.room_id) | ||
| ): | ||
| partial_state_invisible_events.add(e) | ||
| partial_state_invisible_event_ids: Set[str] = set() | ||
| maybe_visible_events: List[EventBase] = [] | ||
| for e in events: | ||
| sender_domain = get_domain_from_id(e.sender) | ||
| if ( | ||
| sender_domain != local_server_name | ||
| and await storage.main.is_partial_state_room(e.room_id) | ||
| ): | ||
| partial_state_invisible_event_ids.add(e.event_id) | ||
| else: | ||
| maybe_visible_events.append(e) | ||
|
|
||
| # Let's check to see if all the events have a history visibility | ||
| # of "shared" or "world_readable". If that's the case then we don't | ||
| # need to check membership (as we know the server is in the room). | ||
| event_to_history_vis = await _event_to_history_vis(storage, events) | ||
| event_to_history_vis = await _event_to_history_vis(storage, maybe_visible_events) | ||
|
|
||
| # for any with restricted vis, we also need the memberships | ||
| event_to_memberships = await _event_to_memberships( | ||
| storage, | ||
| [ | ||
| e | ||
| for e in events | ||
| for e in maybe_visible_events | ||
| if event_to_history_vis[e.event_id] | ||
| not in (HistoryVisibility.SHARED, HistoryVisibility.WORLD_READABLE) | ||
| ], | ||
| target_server_name, | ||
| ) | ||
|
|
||
| to_return = [] | ||
| for e in events: | ||
| def include_event_in_output(e: EventBase) -> bool: | ||
| if e.event_id in partial_state_invisible_event_ids: | ||
| return False | ||
|
|
||
| erased = is_sender_erased(e, erased_senders) | ||
| visible = check_event_is_visible( | ||
| event_to_history_vis[e.event_id], event_to_memberships.get(e.event_id, {}) | ||
| ) | ||
|
|
||
| if e in partial_state_invisible_events: | ||
| visible = False | ||
| return visible and not erased | ||
|
|
||
| if visible and not erased: | ||
| to_return = [] | ||
| for e in events: | ||
| if include_event_in_output(e): | ||
| to_return.append(e) | ||
| elif redact: | ||
| to_return.append(prune_event(e)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| from typing import Callable, List, Optional, Tuple | ||
| from typing import Callable, Collection, List, Optional, Tuple | ||
| from unittest import mock | ||
| from unittest.mock import Mock | ||
|
|
||
| from twisted.test.proto_helpers import MemoryReactor | ||
|
|
@@ -500,3 +501,77 @@ def test_not_latest_event(self) -> None: | |
| self.assertEqual(len(sent_pdus), 1) | ||
| self.assertEqual(sent_pdus[0].event_id, event_2.event_id) | ||
| self.assertFalse(per_dest_queue._catching_up) | ||
|
|
||
| def test_catch_up_is_not_blocked_by_partial_state_room(self) -> None: | ||
| """Detects (part of?) https://github.com/matrix-org/synapse/issues/15220.""" | ||
| # ARRANGE: | ||
| # - a local user (u1) | ||
| # - a room with which contains u1 and two remote users, @u2:host2 and @u3:other | ||
| # - events in that room such that | ||
| # - history visibility is restricted | ||
| # - u1 sent message events | ||
| # - afterwards, u3 sent a remote event | ||
| # - catchup to begin for host2 | ||
| per_dest_queue, sent_pdus = self.make_fake_destination_queue() | ||
|
|
||
| self.register_user("u1", "you the one") | ||
| u1_token = self.login("u1", "you the one") | ||
| room = self.helper.create_room_as("u1", tok=u1_token) | ||
| self.helper.send_state( | ||
| room_id=room, | ||
| event_type="m.room.history_visibility", | ||
| body={"history_visibility": "joined"}, | ||
| tok=u1_token, | ||
| ) | ||
| self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u2:host2", "join") | ||
| ) | ||
| self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u3:other", "join") | ||
| ) | ||
|
|
||
| # create some events | ||
| event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"] | ||
| event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"] | ||
| # pretend that u3 changes their displayname | ||
| event_id_3 = self.get_success( | ||
| event_injection.inject_member_event(self.hs, room, "@u3:other", "join") | ||
| ).event_id | ||
|
|
||
| # destination_rooms should already be populated, but let us pretend that we already | ||
| # sent (successfully) up to and including event id 1 | ||
| event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1)) | ||
| assert event_1.internal_metadata.stream_ordering is not None | ||
| self.get_success( | ||
| self.hs.get_datastores().main.set_destination_last_successful_stream_ordering( | ||
| "host2", event_1.internal_metadata.stream_ordering | ||
| ) | ||
| ) | ||
|
|
||
| # Mock event 3 as having partial state | ||
| self.get_success( | ||
| event_injection.mark_event_as_partial_state(self.hs, event_id_3, room) | ||
| ) | ||
|
|
||
| # Fail the test if we block on full state for event 3. | ||
| async def mock_await_full_state(event_ids: Collection[str]) -> None: | ||
| if event_id_3 in event_ids: | ||
| raise AssertionError("Tried to await full state for event_id_3") | ||
|
|
||
| # ACT | ||
| with mock.patch.object( | ||
| self.hs.get_storage_controllers().state._partial_state_events_tracker, | ||
| "await_full_state", | ||
| mock_await_full_state, | ||
| ): | ||
| self.get_success(per_dest_queue._catch_up_transmission_loop()) | ||
|
|
||
| # ASSERT | ||
| # We should have: | ||
| # - not sent event 3: it's not ours, and the room is partial stated | ||
| # - fallen back to sending event 2: it's the most recent event in the room | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a little dodgy. In this situation, BUT: suppose that the order of event was
then host 2 wouldn't be privvy to event 2. I think we'll either
I need to probably make another test case and stare at this some more. |
||
| # we tried to send to host2 | ||
| # - completed catch-up | ||
| self.assertEqual(len(sent_pdus), 1) | ||
| self.assertEqual(sent_pdus[0].event_id, event_id_2) | ||
| self.assertFalse(per_dest_queue._catching_up) | ||
Uh oh!
There was an error while loading. Please reload this page.