Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make StreamToken.room_key be a RoomStreamToken instance. #8281

Merged
merged 4 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8281.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change `StreamToken.room_key` to be a `RoomStreamToken` instance.
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ files =
synapse/server_notices,
synapse/spam_checker_api,
synapse/state,
synapse/storage/databases/main/events.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
synapse/storage/database.py,
synapse/storage/engines,
synapse/storage/persist_events.py,
synapse/storage/state.py,
synapse/storage/util,
synapse/streams,
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ async def export_user_data(self, user_id, writer):
else:
stream_ordering = room.stream_ordering

from_key = str(RoomStreamToken(0, 0))
to_key = str(RoomStreamToken(None, stream_ordering))
from_key = RoomStreamToken(0, 0)
to_key = RoomStreamToken(None, stream_ordering)

written_events = set() # Events that we've processed in this room

Expand All @@ -153,7 +153,7 @@ async def export_user_data(self, user_id, writer):
if not events:
break

from_key = events[-1].internal_metadata.after
from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)

events = await filter_events_for_client(self.storage, user_id, events)

Expand Down
12 changes: 5 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
Expand Down Expand Up @@ -104,18 +105,15 @@ async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:

@trace
@measure_func("device.get_user_ids_changed")
async def get_user_ids_changed(self, user_id, from_token):
async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.

Args:
user_id (str)
from_token (StreamToken)
"""

set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = await self.store.get_room_events_max_id()
now_room_id = self.store.get_room_max_stream_ordering()
now_room_key = RoomStreamToken(None, now_room_id)

room_ids = await self.store.get_rooms_for_user(user_id)

Expand All @@ -142,7 +140,7 @@ async def get_user_ids_changed(self, user_id, from_token):
)
rooms_changed.update(event.room_id for event in member_events)

stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
stream_ordering = from_token.room_key.stream

possibly_changed = set(changed)
possibly_left = set()
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -167,7 +167,7 @@ async def handle_room(event: RoomsForUser):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
room_end_token = RoomStreamToken(None, event.stream_ordering,)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ async def persist_and_notify_client_event(
This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
assert self.storage.persistence is not None

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def get_messages(
# gets called.
raise Exception("limit not set")

room_token = RoomStreamToken.parse(from_token.room_key)
room_token = from_token.room_key

with await self.pagination_lock.read(room_id):
(
Expand Down Expand Up @@ -381,7 +381,7 @@ async def get_messages(

if leave_token.topological < max_topo:
from_token = from_token.copy_and_replace(
"room_key", leave_token_str
"room_key", leave_token
)

await self.hs.get_handlers().federation_handler.maybe_backfill(
Expand Down
15 changes: 7 additions & 8 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,20 +1091,19 @@ def __init__(self, hs: "HomeServer"):
async def get_new_events(
self,
user: UserID,
from_key: str,
from_key: RoomStreamToken,
limit: int,
room_ids: List[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[EventBase], str]:
) -> Tuple[List[EventBase], RoomStreamToken]:
# We just ignore the key for now.

to_key = self.get_current_key()

from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
from_key = RoomStreamToken(None, from_key.stream)

app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
Expand Down Expand Up @@ -1133,14 +1132,14 @@ async def get_new_events(
events[:] = events[:limit]

if events:
end_key = events[-1].internal_metadata.after
end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
else:
end_key = to_key

return (events, end_key)

def get_current_key(self) -> str:
return "s%d" % (self.store.get_room_max_stream_ordering(),)
def get_current_key(self) -> RoomStreamToken:
return RoomStreamToken(None, self.store.get_room_max_stream_ordering())

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
Expand Down
11 changes: 6 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async def ephemeral_by_room(
sync_config = sync_result_builder.sync_config

with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
typing_key = since_token.typing_key if since_token else 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was get_new_events always expecting an int? I'm not sure I understand this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I noticed this as part of some mypy fun and forgot about it. Basically since_token.typing_key is now an int so this is "just" fixing the ternary to return a consistent type


room_ids = sync_result_builder.joined_room_ids

Expand All @@ -402,7 +402,7 @@ async def ephemeral_by_room(
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)

receipt_key = since_token.receipt_key if since_token else "0"
receipt_key = since_token.receipt_key if since_token else 0

receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
Expand Down Expand Up @@ -533,7 +533,7 @@ async def _load_filtered_recents(
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)

prev_batch_token = now_token.copy_and_replace("room_key", room_key)

Expand Down Expand Up @@ -1322,6 +1322,7 @@ async def _generate_sync_entry_for_presence(
is_guest=sync_config.is_guest,
include_offline=include_offline,
)
assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
"presence_key", presence_key
)
Expand Down Expand Up @@ -1484,7 +1485,7 @@ async def _have_rooms_changed(
if rooms_changed:
return True

stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
Expand Down Expand Up @@ -1750,7 +1751,7 @@ async def _get_all_rooms(
continue

leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
"room_key", RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
Expand Down
16 changes: 12 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Set,
Tuple,
TypeVar,
Union,
)

from prometheus_client import Counter
Expand All @@ -41,7 +42,7 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import Collection, StreamToken, UserID
from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -111,7 +112,9 @@ def __init__(
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())

def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
def notify(
self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to eventually move all of these over to RoomStreamToken or?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, so, argh. I spent some time trying to figure out what to do here, but basically we have a problem where the type of stream_id depends on the stream_key value. Adding overrides with literals sort works for the top level, but those functions then can't call other functions with literal overrides.

I think it may be possible to convert these into full StreamTokens with a bit of rejigging though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If this gets more complicated it might make sense to make a type that represents this (StreamToken = Union[int, RoomStreamToken, FutureStreamToken]).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the new stuff will just be buried in RoomStreamToken so I'm hoping we don't see any more types for now. The big thing will be if we add types to the other streams

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big thing will be if we add types to the other streams

This is what I meant, yep!

):
"""Notify any listeners for this user of a new event from an
event source.
Args:
Expand Down Expand Up @@ -294,7 +297,12 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
rooms.add(event.room_id)

if users or rooms:
self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms)
self.on_new_event(
"room_key",
RoomStreamToken(None, max_room_stream_id),
users=users,
rooms=rooms,
)
self._on_updated_room_token(max_room_stream_id)

def _on_updated_room_token(self, max_room_stream_id: int):
Expand Down Expand Up @@ -329,7 +337,7 @@ async def _notify_pusher_pool(self, max_room_stream_id: int):
def on_new_event(
self,
stream_key: str,
new_token: int,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
rooms: Collection[str] = [],
):
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__(self, hs, stores: Databases):
# interfaces.
self.main = stores.main

self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)

self.persistence = None
if stores.persist_events:
self.persistence = EventsPersistenceStorage(hs, stores)
21 changes: 15 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[st
Returns:
Filtered event ids
"""
results = []
results = [] # type: List[str]

def _get_events_which_are_prevs_txn(txn, batch):
sql = """
Expand Down Expand Up @@ -631,7 +631,9 @@ def _update_forward_extremities_txn(
)

@classmethod
def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
def _filter_events_and_contexts_for_duplicates(
cls, events_and_contexts: List[Tuple[EventBase, EventContext]]
) -> List[Tuple[EventBase, EventContext]]:
"""Ensure that we don't have the same event twice.

Pick the earliest non-outlier if there is one, else the earliest one.
Expand All @@ -641,7 +643,9 @@ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
Returns:
list[(EventBase, EventContext)]: filtered list
"""
new_events_and_contexts = OrderedDict()
new_events_and_contexts = (
OrderedDict()
) # type: OrderedDict[str, Tuple[EventBase, EventContext]]
for event, context in events_and_contexts:
prev_event_context = new_events_and_contexts.get(event.event_id)
if prev_event_context:
Expand All @@ -655,7 +659,12 @@ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
new_events_and_contexts[event.event_id] = (event, context)
return list(new_events_and_contexts.values())

def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
def _update_room_depths_txn(
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
):
"""Update min_depth for each room

Args:
Expand All @@ -664,7 +673,7 @@ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
we are persisting
backfilled (bool): True if the events were backfilled
"""
depth_updates = {}
depth_updates = {} # type: Dict[str, int]
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
Expand Down Expand Up @@ -1436,7 +1445,7 @@ def _update_backward_extremeties(self, txn, events):

Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
events_by_room = {} # type: Dict[str, List[EventBase]]
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

Expand Down
Loading