Skip to content

Small perf improvement to limited incremental sync #17149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17149.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Small performance improvement to limited incremental sync in large rooms.
48 changes: 42 additions & 6 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class TimelineBatch:
prev_batch: StreamToken
events: Sequence[EventBase]
limited: bool

# All the events that were fetched from the DB while loading the room. This
# is a superset of `events`.
fetched_events: Sequence[EventBase]
fetched_limited: bool # Whether there is a gap between the previous timeline batch

# A mapping of event ID to the bundled aggregations for the above events.
# This is only calculated if limited is true.
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
Expand Down Expand Up @@ -614,7 +620,11 @@ async def _load_filtered_recents(
)

return TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=False
events=recents,
prev_batch=prev_batch_token,
limited=False,
fetched_events=recents,
fetched_limited=False,
)

filtering_factor = 2
Expand All @@ -631,6 +641,9 @@ async def _load_filtered_recents(
elif since_token and not newly_joined_room:
since_key = since_token.room_key

fetched_events: List[EventBase] = []
fetched_limited = True

while limited and len(recents) < timeline_limit and max_repeat:
# If we have a since_key then we are trying to get any events
# that have happened since `since_key` up to `end_key`, so we
Expand All @@ -649,6 +662,10 @@ async def _load_filtered_recents(
room_id, limit=load_limit + 1, end_token=end_key
)

# We prepend as `fetched_events` is in ascending stream order,
# and `events` is from *before* the previously fetched events.
fetched_events = events + fetched_events

log_kv({"loaded_recents": len(events)})

loaded_recents = (
Expand Down Expand Up @@ -701,6 +718,7 @@ async def _load_filtered_recents(

if len(events) <= load_limit:
limited = False
fetched_limited = False
break
max_repeat -= 1

Expand Down Expand Up @@ -731,6 +749,8 @@ async def _load_filtered_recents(
# (to force client to paginate the gap).
limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
fetched_events=fetched_events,
fetched_limited=fetched_limited,
)

async def get_state_after_event(
Expand Down Expand Up @@ -1268,8 +1288,12 @@ async def _compute_state_delta_for_incremental_sync(
#
# c.f. #16941 for an example of why we can't do this for all non-gappy
# syncs.
#
# We can apply a similar optimization for gappy syncs if we know the room
# has been linear in the gap, so instead of just looking at the
# `timeline.batch` we can look at `timeline.fetched_events`.
is_linear_timeline = True
if batch.events:
if batch.fetched_events:
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
Expand All @@ -1286,8 +1310,19 @@ async def _compute_state_delta_for_incremental_sync(
break
prev_event_id = e.event_id

if is_linear_timeline and not batch.limited:
state_ids: StateMap[str] = {}
if is_linear_timeline and not batch.fetched_limited:
batch_state_ids: MutableStateMap[str] = {}

# If the returned batch is actually limited, we need to add the
# state events that happened in the batch.
if batch.limited:
timeline_events = {e.event_id for e in batch.events}
batch_state_ids = {
(e.type, e.state_key): e.event_id
for e in batch.fetched_events
if e.is_state() and e.event_id not in timeline_events
}

if lazy_load_members:
if members_to_fetch and batch.events:
# We're lazy-loading, so the client might need some more
Expand All @@ -1296,15 +1331,16 @@ async def _compute_state_delta_for_incremental_sync(
# timeline here. The caller will then dedupe any redundant
# ones.

state_ids = await self._state_storage_controller.get_state_ids_for_event(
ll_state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids
batch_state_ids.update(ll_state_ids)
return batch_state_ids

if batch:
state_at_timeline_start = (
Expand Down
Loading