4040
4141logger = logging .getLogger (__name__ )
4242
43+ # How many single event gaps we tolerate returning in a `/messages` response before we
44+ # backfill and try to fill in the history. This is an arbitrarily picked number so feel
45+ # free to tune it in the future.
46+ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
47+
4348
4449@attr .s (slots = True , auto_attribs = True )
4550class PurgeStatus :
@@ -486,35 +491,35 @@ async def get_messages(
486491 room_id , room_token .stream
487492 )
488493
489- if not use_admin_priviledge and membership == Membership .LEAVE :
490- # If they have left the room then clamp the token to be before
491- # they left the room, to save the effort of loading from the
492- # database.
493-
494- # This is only None if the room is world_readable, in which
495- # case "JOIN" would have been returned.
496- assert member_event_id
494+ # If they have left the room then clamp the token to be before
495+ # they left the room, to save the effort of loading from the
496+ # database.
497+ if (
498+ pagin_config .direction == Direction .BACKWARDS
499+ and not use_admin_priviledge
500+ and membership == Membership .LEAVE
501+ ):
502+ # This is only None if the room is world_readable, in which case
503+ # "Membership.JOIN" would have been returned and we should never hit
504+ # this branch.
505+ assert member_event_id
506+
507+ leave_token = await self .store .get_topological_token_for_event (
508+ member_event_id
509+ )
510+ assert leave_token .topological is not None
497511
498- leave_token = await self .store .get_topological_token_for_event (
499- member_event_id
512+ if leave_token .topological < curr_topo :
513+ from_token = from_token .copy_and_replace (
514+ StreamKeyType .ROOM , leave_token
500515 )
501- assert leave_token .topological is not None
502-
503- if leave_token .topological < curr_topo :
504- from_token = from_token .copy_and_replace (
505- StreamKeyType .ROOM , leave_token
506- )
507-
508- await self .hs .get_federation_handler ().maybe_backfill (
509- room_id ,
510- curr_topo ,
511- limit = pagin_config .limit ,
512- )
513516
514517 to_room_key = None
515518 if pagin_config .to_token :
516519 to_room_key = pagin_config .to_token .room_key
517520
521+ # Initially fetch the events from the database. With any luck, we can return
522+ # these without blocking on backfill (handled below).
518523 events , next_key = await self .store .paginate_room_events (
519524 room_id = room_id ,
520525 from_key = from_token .room_key ,
@@ -524,6 +529,94 @@ async def get_messages(
524529 event_filter = event_filter ,
525530 )
526531
532+ if pagin_config .direction == Direction .BACKWARDS :
533+ # We use a `Set` because there can be multiple events at a given depth
534+ # and we only care about looking at the unique continum of depths to
535+ # find gaps.
536+ event_depths : Set [int ] = {event .depth for event in events }
537+ sorted_event_depths = sorted (event_depths )
538+
539+ # Inspect the depths of the returned events to see if there are any gaps
540+ found_big_gap = False
541+ number_of_gaps = 0
542+ previous_event_depth = (
543+ sorted_event_depths [0 ] if len (sorted_event_depths ) > 0 else 0
544+ )
545+ for event_depth in sorted_event_depths :
546+ # We don't expect a negative depth but we'll just deal with it in
547+ # any case by taking the absolute value to get the true gap between
548+ # any two integers.
549+ depth_gap = abs (event_depth - previous_event_depth )
550+ # A `depth_gap` of 1 is a normal continuous chain to the next event
551+ # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
552+ # also possible there is no event at a given depth but we can't ever
553+ # know that for sure)
554+ if depth_gap > 1 :
555+ number_of_gaps += 1
556+
557+ # We only tolerate a small number single-event long gaps in the
558+ # returned events because those are most likely just events we've
559+ # failed to pull in the past. Anything longer than that is probably
560+ # a sign that we're missing a decent chunk of history and we should
561+ # try to backfill it.
562+ #
563+ # XXX: It's possible we could tolerate longer gaps if we checked
564+ # that a given events `prev_events` is one that has failed pull
565+ # attempts and we could just treat it like a dead branch of history
566+ # for now or at least something that we don't need the block the
567+ # client on to try pulling.
568+ #
569+ # XXX: If we had something like MSC3871 to indicate gaps in the
570+ # timeline to the client, we could also get away with any sized gap
571+ # and just have the client refetch the holes as they see fit.
572+ if depth_gap > 2 :
573+ found_big_gap = True
574+ break
575+ previous_event_depth = event_depth
576+
577+ # Backfill in the foreground if we found a big gap, have too many holes,
578+ # or we don't have enough events to fill the limit that the client asked
579+ # for.
580+ missing_too_many_events = (
581+ number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
582+ )
583+ not_enough_events_to_fill_response = len (events ) < pagin_config .limit
584+ if (
585+ found_big_gap
586+ or missing_too_many_events
587+ or not_enough_events_to_fill_response
588+ ):
589+ did_backfill = (
590+ await self .hs .get_federation_handler ().maybe_backfill (
591+ room_id ,
592+ curr_topo ,
593+ limit = pagin_config .limit ,
594+ )
595+ )
596+
597+ # If we did backfill something, refetch the events from the database to
598+ # catch anything new that might have been added since we last fetched.
599+ if did_backfill :
600+ events , next_key = await self .store .paginate_room_events (
601+ room_id = room_id ,
602+ from_key = from_token .room_key ,
603+ to_key = to_room_key ,
604+ direction = pagin_config .direction ,
605+ limit = pagin_config .limit ,
606+ event_filter = event_filter ,
607+ )
608+ else :
609+ # Otherwise, we can backfill in the background for eventual
610+ # consistency's sake but we don't need to block the client waiting
611+ # for a costly federation call and processing.
612+ run_as_background_process (
613+ "maybe_backfill_in_the_background" ,
614+ self .hs .get_federation_handler ().maybe_backfill ,
615+ room_id ,
616+ curr_topo ,
617+ limit = pagin_config .limit ,
618+ )
619+
527620 next_token = from_token .copy_and_replace (StreamKeyType .ROOM , next_key )
528621
529622 # if no events are returned from pagination, that implies
0 commit comments