Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 5d3e306

Browse files
authored
Clean up Notifier.on_new_room_event code path (#8288)
The idea here is that we pass the `max_stream_id` to everything, and only use the stream ID of the particular event to figure out *when* the max stream position has caught up to the event and we can notify people about it. This is to maintain the distinction between the position of an item in the stream (i.e. event A has stream ID 513) and a token that can be used to partition the stream (i.e. give me all events after stream ID 352). This distinction becomes important when the tokens are more complicated than a single number, which they will be once we start tracking the position of multiple writers in the tokens. The valid operations here are: 1. Is a position before or after a token 2. Fetching all events between two tokens 3. Merging multiple tokens to get the "max", i.e. `C = max(A, B)` means that for all positions P where P is before A *or* before B, then P is before C. Future PR will change the token type to a dedicated type.
1 parent a3a90ee commit 5d3e306

File tree

6 files changed

+44
-37
lines changed

6 files changed

+44
-37
lines changed

changelog.d/8288.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor notifier code to correctly use the max event stream position.

synapse/handlers/federation.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ def __init__(self, hs):
128128
self.keyring = hs.get_keyring()
129129
self.action_generator = hs.get_action_generator()
130130
self.is_mine_id = hs.is_mine_id
131-
self.pusher_pool = hs.get_pusherpool()
132131
self.spam_checker = hs.get_spam_checker()
133132
self.event_creation_handler = hs.get_event_creation_handler()
134133
self._message_handler = hs.get_message_handler()
@@ -2939,8 +2938,6 @@ async def _notify_persisted_event(
29392938
event, event_stream_id, max_stream_id, extra_users=extra_users
29402939
)
29412940

2942-
await self.pusher_pool.on_new_notifications(max_stream_id)
2943-
29442941
async def _clean_room_for_join(self, room_id: str) -> None:
29452942
"""Called to clean up any data in DB for a given room, ready for the
29462943
server to join the room.

synapse/handlers/message.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,6 @@ def __init__(self, hs: "HomeServer"):
387387
# This is only used to get at ratelimit function, and maybe_kick_guest_users
388388
self.base_handler = BaseHandler(hs)
389389

390-
self.pusher_pool = hs.get_pusherpool()
391-
392390
# We arbitrarily limit concurrent event creation for a room to 5.
393391
# This is to stop us from diverging history *too* much.
394392
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
@@ -1145,8 +1143,6 @@ def is_inviter_member_event(e):
11451143
# If there's an expiry timestamp on the event, schedule its expiry.
11461144
self._message_handler.maybe_schedule_expiry(event)
11471145

1148-
await self.pusher_pool.on_new_notifications(max_stream_id)
1149-
11501146
def _notify():
11511147
try:
11521148
self.notifier.on_new_room_event(

synapse/notifier.py

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
Set,
2626
Tuple,
2727
TypeVar,
28-
Union,
2928
)
3029

3130
from prometheus_client import Counter
@@ -187,7 +186,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
187186
self.store = hs.get_datastore()
188187
self.pending_new_room_events = (
189188
[]
190-
) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
189+
) # type: List[Tuple[int, EventBase, Collection[UserID]]]
191190

192191
# Called when there are new things to stream over replication
193192
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -198,6 +197,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
198197

199198
self.clock = hs.get_clock()
200199
self.appservice_handler = hs.get_application_service_handler()
200+
self._pusher_pool = hs.get_pusherpool()
201201

202202
self.federation_sender = None
203203
if hs.should_send_federation():
@@ -247,7 +247,7 @@ def on_new_room_event(
247247
event: EventBase,
248248
room_stream_id: int,
249249
max_room_stream_id: int,
250-
extra_users: Collection[Union[str, UserID]] = [],
250+
extra_users: Collection[UserID] = [],
251251
):
252252
""" Used by handlers to inform the notifier something has happened
253253
in the room, room event wise.
@@ -274,47 +274,63 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
274274
"""
275275
pending = self.pending_new_room_events
276276
self.pending_new_room_events = []
277+
278+
users = set() # type: Set[UserID]
279+
rooms = set() # type: Set[str]
280+
277281
for room_stream_id, event, extra_users in pending:
278282
if room_stream_id > max_room_stream_id:
279283
self.pending_new_room_events.append(
280284
(room_stream_id, event, extra_users)
281285
)
282286
else:
283-
self._on_new_room_event(event, room_stream_id, extra_users)
287+
if (
288+
event.type == EventTypes.Member
289+
and event.membership == Membership.JOIN
290+
):
291+
self._user_joined_room(event.state_key, event.room_id)
292+
293+
users.update(extra_users)
294+
rooms.add(event.room_id)
295+
296+
if users or rooms:
297+
self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms)
298+
self._on_updated_room_token(max_room_stream_id)
299+
300+
def _on_updated_room_token(self, max_room_stream_id: int):
301+
"""Poke services that might care that the room position has been
302+
updated.
303+
"""
284304

285-
def _on_new_room_event(
286-
self,
287-
event: EventBase,
288-
room_stream_id: int,
289-
extra_users: Collection[Union[str, UserID]] = [],
290-
):
291-
"""Notify any user streams that are interested in this room event"""
292305
# poke any interested application service.
293306
run_as_background_process(
294-
"notify_app_services", self._notify_app_services, room_stream_id
307+
"_notify_app_services", self._notify_app_services, max_room_stream_id
295308
)
296309

297-
if self.federation_sender:
298-
self.federation_sender.notify_new_events(room_stream_id)
299-
300-
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
301-
self._user_joined_room(event.state_key, event.room_id)
302-
303-
self.on_new_event(
304-
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
310+
run_as_background_process(
311+
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
305312
)
306313

307-
async def _notify_app_services(self, room_stream_id: int):
314+
if self.federation_sender:
315+
self.federation_sender.notify_new_events(max_room_stream_id)
316+
317+
async def _notify_app_services(self, max_room_stream_id: int):
308318
try:
309-
await self.appservice_handler.notify_interested_services(room_stream_id)
319+
await self.appservice_handler.notify_interested_services(max_room_stream_id)
310320
except Exception:
311321
logger.exception("Error notifying application services of event")
312322

323+
async def _notify_pusher_pool(self, max_room_stream_id: int):
324+
try:
325+
await self._pusher_pool.on_new_notifications(max_room_stream_id)
326+
except Exception:
327+
logger.exception("Error pusher pool of event")
328+
313329
def on_new_event(
314330
self,
315331
stream_key: str,
316332
new_token: int,
317-
users: Collection[Union[str, UserID]] = [],
333+
users: Collection[UserID] = [],
318334
rooms: Collection[str] = [],
319335
):
320336
""" Used to inform listeners that something has happened event wise.

synapse/push/pusherpool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
184184
)
185185
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
186186

187-
async def on_new_notifications(self, max_stream_id):
187+
async def on_new_notifications(self, max_stream_id: int):
188188
if not self.pushers:
189189
# nothing to do here.
190190
return

synapse/replication/tcp/client.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
EventsStreamEventRow,
3030
EventsStreamRow,
3131
)
32+
from synapse.types import UserID
3233
from synapse.util.async_helpers import timeout_deferred
3334
from synapse.util.metrics import Measure
3435

@@ -98,7 +99,6 @@ class ReplicationDataHandler:
9899

99100
def __init__(self, hs: "HomeServer"):
100101
self.store = hs.get_datastore()
101-
self.pusher_pool = hs.get_pusherpool()
102102
self.notifier = hs.get_notifier()
103103
self._reactor = hs.get_reactor()
104104
self._clock = hs.get_clock()
@@ -148,15 +148,12 @@ async def on_rdata(
148148
if event.rejected_reason:
149149
continue
150150

151-
extra_users = () # type: Tuple[str, ...]
151+
extra_users = () # type: Tuple[UserID, ...]
152152
if event.type == EventTypes.Member:
153-
extra_users = (event.state_key,)
153+
extra_users = (UserID.from_string(event.state_key),)
154154
max_token = self.store.get_room_max_stream_ordering()
155155
self.notifier.on_new_room_event(event, token, max_token, extra_users)
156156

157-
max_token = self.store.get_room_max_stream_ordering()
158-
await self.pusher_pool.on_new_notifications(max_token)
159-
160157
# Notify any waiting deferreds. The list is ordered by position so we
161158
# just iterate through the list until we reach a position that is
162159
# greater than the received row position.

0 commit comments

Comments
 (0)