Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't send normal presence updates over federation replication stream #9828

Merged
merged 7 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a maybe_send_presence_to_interested_destinations helper function
  • Loading branch information
erikjohnston committed Apr 16, 2021
commit 390b500c7b186cdbe2aceef3869ba21d2c1f564b
59 changes: 27 additions & 32 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.presence_router = hs.get_presence_router()
self.state = hs.get_state_handler()

self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's rather confusing that PresenceHandler ends up with both _federation and federation attributes, which are similar but different. I rather wonder if maybe_send_presence_to_interested_destinations should be unconditional and the should_send_federation should happen in the callers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's rather confusing that PresenceHandler ends up with both _federation and federation attributes, which are similar but different. I rather wonder if maybe_send_presence_to_interested_destinations should be unconditional and the should_send_federation should happen in the callers.

Oh yes, that is horrible. At the end of this PR we end up in a bit of a halfway house, annoyingly. The really tedious bit is that get_federation_sender will throw on workers, but on master will return the send queue instead. I can move the should_send_federation to the call sites and then have self._federation set if its on master or a federation sender, and then in the next PR we can revert and clean it up a bit? (The next PR will remove the weirdness where get_federation_sender returns the queue on master)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up doing something slightly different, and replaced instances of self.federation with self._federation. This involves some assertions to keep mypy happy, but they are temporary until the next PR where we rip out the usage of the federation send queue.


self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

active_presence = self.store.take_presence_startup_info()
Expand Down Expand Up @@ -250,6 +255,26 @@ async def process_replication_rows(self, token, rows):
"""Process presence stream rows received over replication."""
pass

async def maybe_send_presence_to_interested_destinations(
self, states: List[UserPresenceState]
):
"""If this instance is a federation sender, send the states to all
destinations that are interested.
"""

if not self._federation:
return

hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)

for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
Expand All @@ -264,11 +289,6 @@ def __init__(self, hs):
self.hs = hs
self.is_mine_id = hs.is_mine_id

self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()

self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence

# The number of ongoing syncs on this process, by user id.
Expand Down Expand Up @@ -394,18 +414,7 @@ async def notify_from_replication(self, states, stream_id):
)

# If this is a federation sender, notify about presence updates.
if not self._federation:
return

hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)

for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
await self.maybe_send_presence_to_interested_destinations(states)

async def process_replication_rows(self, token, rows):
states = [
Expand Down Expand Up @@ -483,11 +492,8 @@ def __init__(self, hs: "HomeServer"):
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence

self._send_federation = hs.should_send_federation()

federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler("m.presence", self.incoming_presence)
Expand Down Expand Up @@ -951,18 +957,7 @@ async def _persist_and_notify(self, states):
# We only want to poke the local federation sender, if any, as other
# workers will receive the presence updates via the presence replication
# stream.
if not self._send_federation:
return

hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)

for destinations, states in hosts_and_states:
self.federation.send_presence_to_destinations(states, destinations)
await self.maybe_send_presence_to_interested_destinations(states)

async def incoming_presence(self, origin, content):
"""Called when we receive a `m.presence` EDU from a remote server."""
Expand Down
24 changes: 7 additions & 17 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from twisted.internet import defer

from synapse.events import EventBase
from synapse.handlers.presence import get_interested_remotes
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -53,10 +52,6 @@ def __init__(self, hs, auth_handler):
self._presence_stream = hs.get_event_sources().sources["presence"]
self._state = hs.get_state_handler()

self._federation = None
if hs.should_send_federation():
self._federation = self._hs.get_federation_sender()

# We expose these as properties below in order to attach a helpful docstring.
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
Expand Down Expand Up @@ -429,25 +424,20 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
else:
if not self._federation:
continue

# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)

# Send to remote destinations
hosts_and_states = await get_interested_remotes(
self._store,
self._hs.get_presence_router(),
presence_events,
self._state,
)
# Send to remote destinations.

for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
)


class PublicRoomListManager:
Expand Down