Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit ac11fcb

Browse files
authored
Add EventStreamPosition type (#8388)
The idea is to remove some of the places we pass around `int`, where it can represent one of two things: 1. the position of an event in the stream; or 2. a token that partitions the stream, used as part of the stream tokens. The valid operations are then: 1. did a position happen before or after a token; 2. get all events that happened before or after a token; and 3. get all events between two tokens. (Note that we don't want to allow other operations as we want to change the tokens to be vector clocks rather than simple ints)
1 parent 13099ae commit ac11fcb

File tree

11 files changed

+100
-57
lines changed

11 files changed

+100
-57
lines changed

changelog.d/8388.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `EventStreamPosition` type.

synapse/handlers/federation.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
from synapse.types import (
7575
JsonDict,
7676
MutableStateMap,
77+
PersistedEventPosition,
78+
RoomStreamToken,
7779
StateMap,
7880
UserID,
7981
get_domain_from_id,
@@ -2956,7 +2958,7 @@ async def persist_events_and_notify(
29562958
)
29572959
return result["max_stream_id"]
29582960
else:
2959-
max_stream_id = await self.storage.persistence.persist_events(
2961+
max_stream_token = await self.storage.persistence.persist_events(
29602962
event_and_contexts, backfilled=backfilled
29612963
)
29622964

@@ -2967,12 +2969,12 @@ async def persist_events_and_notify(
29672969

29682970
if not backfilled: # Never notify for backfilled events
29692971
for event, _ in event_and_contexts:
2970-
await self._notify_persisted_event(event, max_stream_id)
2972+
await self._notify_persisted_event(event, max_stream_token)
29712973

2972-
return max_stream_id
2974+
return max_stream_token.stream
29732975

29742976
async def _notify_persisted_event(
2975-
self, event: EventBase, max_stream_id: int
2977+
self, event: EventBase, max_stream_token: RoomStreamToken
29762978
) -> None:
29772979
"""Checks to see if notifier/pushers should be notified about the
29782980
event or not.
@@ -2998,9 +3000,11 @@ async def _notify_persisted_event(
29983000
elif event.internal_metadata.is_outlier():
29993001
return
30003002

3001-
event_stream_id = event.internal_metadata.stream_ordering
3003+
event_pos = PersistedEventPosition(
3004+
self._instance_name, event.internal_metadata.stream_ordering
3005+
)
30023006
self.notifier.on_new_room_event(
3003-
event, event_stream_id, max_stream_id, extra_users=extra_users
3007+
event, event_pos, max_stream_token, extra_users=extra_users
30043008
)
30053009

30063010
async def _clean_room_for_join(self, room_id: str) -> None:

synapse/handlers/message.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,7 @@ def is_inviter_member_event(e):
11381138
if prev_state_ids:
11391139
raise AuthError(403, "Changing the room create event is forbidden")
11401140

1141-
event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
1141+
event_pos, max_stream_token = await self.storage.persistence.persist_event(
11421142
event, context=context
11431143
)
11441144

@@ -1149,7 +1149,7 @@ def is_inviter_member_event(e):
11491149
def _notify():
11501150
try:
11511151
self.notifier.on_new_room_event(
1152-
event, event_stream_id, max_stream_id, extra_users=extra_users
1152+
event, event_pos, max_stream_token, extra_users=extra_users
11531153
)
11541154
except Exception:
11551155
logger.exception("Error notifying about new room event")
@@ -1161,7 +1161,7 @@ def _notify():
11611161
# matters as sometimes presence code can take a while.
11621162
run_in_background(self._bump_active_time, requester.user)
11631163

1164-
return event_stream_id
1164+
return event_pos.stream
11651165

11661166
async def _bump_active_time(self, user: UserID) -> None:
11671167
try:

synapse/handlers/sync.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ async def generate_sync_result(
967967
raise NotImplementedError()
968968
else:
969969
joined_room_ids = await self.get_rooms_for_user_at(
970-
user_id, now_token.room_stream_id
970+
user_id, now_token.room_key
971971
)
972972
sync_result_builder = SyncResultBuilder(
973973
sync_config,
@@ -1916,7 +1916,7 @@ async def _generate_room_entry(
19161916
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
19171917

19181918
async def get_rooms_for_user_at(
1919-
self, user_id: str, stream_ordering: int
1919+
self, user_id: str, room_key: RoomStreamToken
19201920
) -> FrozenSet[str]:
19211921
"""Get set of joined rooms for a user at the given stream ordering.
19221922
@@ -1942,15 +1942,15 @@ async def get_rooms_for_user_at(
19421942
# If the membership's stream ordering is after the given stream
19431943
# ordering, we need to go and work out if the user was in the room
19441944
# before.
1945-
for room_id, membership_stream_ordering in joined_rooms:
1946-
if membership_stream_ordering <= stream_ordering:
1945+
for room_id, event_pos in joined_rooms:
1946+
if not event_pos.persisted_after(room_key):
19471947
joined_room_ids.add(room_id)
19481948
continue
19491949

19501950
logger.info("User joined room after current token: %s", room_id)
19511951

19521952
extrems = await self.store.get_forward_extremeties_for_room(
1953-
room_id, stream_ordering
1953+
room_id, event_pos.stream
19541954
)
19551955
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
19561956
if user_id in users_in_room:

synapse/notifier.py

+29-26
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,13 @@
4242
from synapse.metrics import LaterGauge
4343
from synapse.metrics.background_process_metrics import run_as_background_process
4444
from synapse.streams.config import PaginationConfig
45-
from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
45+
from synapse.types import (
46+
Collection,
47+
PersistedEventPosition,
48+
RoomStreamToken,
49+
StreamToken,
50+
UserID,
51+
)
4652
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
4753
from synapse.util.metrics import Measure
4854
from synapse.visibility import filter_events_for_client
@@ -187,7 +193,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
187193
self.store = hs.get_datastore()
188194
self.pending_new_room_events = (
189195
[]
190-
) # type: List[Tuple[int, EventBase, Collection[UserID]]]
196+
) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
191197

192198
# Called when there are new things to stream over replication
193199
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -246,8 +252,8 @@ def add_replication_callback(self, cb: Callable[[], None]):
246252
def on_new_room_event(
247253
self,
248254
event: EventBase,
249-
room_stream_id: int,
250-
max_room_stream_id: int,
255+
event_pos: PersistedEventPosition,
256+
max_room_stream_token: RoomStreamToken,
251257
extra_users: Collection[UserID] = [],
252258
):
253259
""" Used by handlers to inform the notifier something has happened
@@ -261,16 +267,16 @@ def on_new_room_event(
261267
until all previous events have been persisted before notifying
262268
the client streams.
263269
"""
264-
self.pending_new_room_events.append((room_stream_id, event, extra_users))
265-
self._notify_pending_new_room_events(max_room_stream_id)
270+
self.pending_new_room_events.append((event_pos, event, extra_users))
271+
self._notify_pending_new_room_events(max_room_stream_token)
266272

267273
self.notify_replication()
268274

269-
def _notify_pending_new_room_events(self, max_room_stream_id: int):
275+
def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
270276
"""Notify for the room events that were queued waiting for a previous
271277
event to be persisted.
272278
Args:
273-
max_room_stream_id: The highest stream_id below which all
279+
max_room_stream_token: The highest stream_id below which all
274280
events have been persisted.
275281
"""
276282
pending = self.pending_new_room_events
@@ -279,11 +285,9 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
279285
users = set() # type: Set[UserID]
280286
rooms = set() # type: Set[str]
281287

282-
for room_stream_id, event, extra_users in pending:
283-
if room_stream_id > max_room_stream_id:
284-
self.pending_new_room_events.append(
285-
(room_stream_id, event, extra_users)
286-
)
288+
for event_pos, event, extra_users in pending:
289+
if event_pos.persisted_after(max_room_stream_token):
290+
self.pending_new_room_events.append((event_pos, event, extra_users))
287291
else:
288292
if (
289293
event.type == EventTypes.Member
@@ -296,39 +300,38 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
296300

297301
if users or rooms:
298302
self.on_new_event(
299-
"room_key",
300-
RoomStreamToken(None, max_room_stream_id),
301-
users=users,
302-
rooms=rooms,
303+
"room_key", max_room_stream_token, users=users, rooms=rooms,
303304
)
304-
self._on_updated_room_token(max_room_stream_id)
305+
self._on_updated_room_token(max_room_stream_token)
305306

306-
def _on_updated_room_token(self, max_room_stream_id: int):
307+
def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
307308
"""Poke services that might care that the room position has been
308309
updated.
309310
"""
310311

311312
# poke any interested application service.
312313
run_as_background_process(
313-
"_notify_app_services", self._notify_app_services, max_room_stream_id
314+
"_notify_app_services", self._notify_app_services, max_room_stream_token
314315
)
315316

316317
run_as_background_process(
317-
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
318+
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
318319
)
319320

320321
if self.federation_sender:
321-
self.federation_sender.notify_new_events(max_room_stream_id)
322+
self.federation_sender.notify_new_events(max_room_stream_token.stream)
322323

323-
async def _notify_app_services(self, max_room_stream_id: int):
324+
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
324325
try:
325-
await self.appservice_handler.notify_interested_services(max_room_stream_id)
326+
await self.appservice_handler.notify_interested_services(
327+
max_room_stream_token.stream
328+
)
326329
except Exception:
327330
logger.exception("Error notifying application services of event")
328331

329-
async def _notify_pusher_pool(self, max_room_stream_id: int):
332+
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
330333
try:
331-
await self._pusher_pool.on_new_notifications(max_room_stream_id)
334+
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
332335
except Exception:
333336
logger.exception("Error pusher pool of event")
334337

synapse/replication/tcp/client.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
EventsStreamEventRow,
3030
EventsStreamRow,
3131
)
32-
from synapse.types import UserID
32+
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
3333
from synapse.util.async_helpers import timeout_deferred
3434
from synapse.util.metrics import Measure
3535

@@ -151,8 +151,14 @@ async def on_rdata(
151151
extra_users = () # type: Tuple[UserID, ...]
152152
if event.type == EventTypes.Member:
153153
extra_users = (UserID.from_string(event.state_key),)
154-
max_token = self.store.get_room_max_stream_ordering()
155-
self.notifier.on_new_room_event(event, token, max_token, extra_users)
154+
155+
max_token = RoomStreamToken(
156+
None, self.store.get_room_max_stream_ordering()
157+
)
158+
event_pos = PersistedEventPosition(instance_name, token)
159+
self.notifier.on_new_room_event(
160+
event, event_pos, max_token, extra_users
161+
)
156162

157163
# Notify any waiting deferreds. The list is ordered by position so we
158164
# just iterate through the list until we reach a position that is

synapse/storage/databases/main/roommember.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
1716
import logging
1817
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set
1918

@@ -37,7 +36,7 @@
3736
ProfileInfo,
3837
RoomsForUser,
3938
)
40-
from synapse.types import Collection, get_domain_from_id
39+
from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
4140
from synapse.util.async_helpers import Linearizer
4241
from synapse.util.caches import intern_string
4342
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -387,7 +386,7 @@ def _get_rooms_for_user_with_stream_ordering_txn(
387386
# for rooms the server is participating in.
388387
if self._current_state_events_membership_up_to_date:
389388
sql = """
390-
SELECT room_id, e.stream_ordering
389+
SELECT room_id, e.instance_name, e.stream_ordering
391390
FROM current_state_events AS c
392391
INNER JOIN events AS e USING (room_id, event_id)
393392
WHERE
@@ -397,7 +396,7 @@ def _get_rooms_for_user_with_stream_ordering_txn(
397396
"""
398397
else:
399398
sql = """
400-
SELECT room_id, e.stream_ordering
399+
SELECT room_id, e.instance_name, e.stream_ordering
401400
FROM current_state_events AS c
402401
INNER JOIN room_memberships AS m USING (room_id, event_id)
403402
INNER JOIN events AS e USING (room_id, event_id)
@@ -408,7 +407,12 @@ def _get_rooms_for_user_with_stream_ordering_txn(
408407
"""
409408

410409
txn.execute(sql, (user_id, Membership.JOIN))
411-
return frozenset(GetRoomsForUserWithStreamOrdering(*row) for row in txn)
410+
return frozenset(
411+
GetRoomsForUserWithStreamOrdering(
412+
room_id, PersistedEventPosition(instance, stream_id)
413+
)
414+
for room_id, instance, stream_id in txn
415+
)
412416

413417
async def get_users_server_still_shares_room_with(
414418
self, user_ids: Collection[str]

synapse/storage/persist_events.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from synapse.metrics.background_process_metrics import run_as_background_process
3232
from synapse.storage.databases import Databases
3333
from synapse.storage.databases.main.events import DeltaState
34-
from synapse.types import Collection, StateMap
34+
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
3535
from synapse.util.async_helpers import ObservableDeferred
3636
from synapse.util.metrics import Measure
3737

@@ -190,6 +190,7 @@ def __init__(self, hs, stores: Databases):
190190
self.persist_events_store = stores.persist_events
191191

192192
self._clock = hs.get_clock()
193+
self._instance_name = hs.get_instance_name()
193194
self.is_mine_id = hs.is_mine_id
194195
self._event_persist_queue = _EventPeristenceQueue()
195196
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -198,7 +199,7 @@ async def persist_events(
198199
self,
199200
events_and_contexts: List[Tuple[EventBase, EventContext]],
200201
backfilled: bool = False,
201-
) -> int:
202+
) -> RoomStreamToken:
202203
"""
203204
Write events to the database
204205
Args:
@@ -228,11 +229,11 @@ async def persist_events(
228229
defer.gatherResults(deferreds, consumeErrors=True)
229230
)
230231

231-
return self.main_store.get_current_events_token()
232+
return RoomStreamToken(None, self.main_store.get_current_events_token())
232233

233234
async def persist_event(
234235
self, event: EventBase, context: EventContext, backfilled: bool = False
235-
) -> Tuple[int, int]:
236+
) -> Tuple[PersistedEventPosition, RoomStreamToken]:
236237
"""
237238
Returns:
238239
The stream ordering of `event`, and the stream ordering of the
@@ -247,7 +248,10 @@ async def persist_event(
247248
await make_deferred_yieldable(deferred)
248249

249250
max_persisted_id = self.main_store.get_current_events_token()
250-
return (event.internal_metadata.stream_ordering, max_persisted_id)
251+
event_stream_id = event.internal_metadata.stream_ordering
252+
253+
pos = PersistedEventPosition(self._instance_name, event_stream_id)
254+
return pos, RoomStreamToken(None, max_persisted_id)
251255

252256
def _maybe_start_persisting(self, room_id: str):
253257
async def persisting_queue(item):

synapse/storage/roommember.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626

2727
GetRoomsForUserWithStreamOrdering = namedtuple(
28-
"_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
28+
"_GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
2929
)
3030

3131

synapse/types.py

+15
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,21 @@ def copy_and_replace(self, key, new_value) -> "StreamToken":
495495
StreamToken.START = StreamToken.from_string("s0_0")
496496

497497

498+
@attr.s(slots=True, frozen=True)
499+
class PersistedEventPosition:
500+
"""Position of a newly persisted event with instance that persisted it.
501+
502+
This can be used to test whether the event is persisted before or after a
503+
RoomStreamToken.
504+
"""
505+
506+
instance_name = attr.ib(type=str)
507+
stream = attr.ib(type=int)
508+
509+
def persisted_after(self, token: RoomStreamToken) -> bool:
510+
return token.stream < self.stream
511+
512+
498513
class ThirdPartyInstanceID(
499514
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
500515
):

0 commit comments

Comments
 (0)