@@ -173,6 +173,9 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
173173 pdu: received PDU
174174 """
175175
176+ # We should never see any outliers here.
177+ assert not pdu .internal_metadata .outlier
178+
176179 room_id = pdu .room_id
177180 event_id = pdu .event_id
178181
@@ -232,77 +235,71 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
232235 )
233236 return None
234237
235- # Check that the event passes auth based on the state at the event. This is
236- # done for events that are to be added to the timeline (non-outliers).
237- #
238- # Get missing pdus if necessary:
239- # - Fetching any missing prev events to fill in gaps in the graph
240- # - Fetching state if we have a hole in the graph
241- if not pdu .internal_metadata .is_outlier ():
242- prevs = set (pdu .prev_event_ids ())
243- seen = await self ._store .have_events_in_timeline (prevs )
244- missing_prevs = prevs - seen
238+ # Try to fetch any missing prev events to fill in gaps in the graph
239+ prevs = set (pdu .prev_event_ids ())
240+ seen = await self ._store .have_events_in_timeline (prevs )
241+ missing_prevs = prevs - seen
245242
246- if missing_prevs :
247- # We only backfill backwards to the min depth.
248- min_depth = await self .get_min_depth_for_context (pdu .room_id )
249- logger .debug ("min_depth: %d" , min_depth )
243+ if missing_prevs :
244+ # We only backfill backwards to the min depth.
245+ min_depth = await self .get_min_depth_for_context (pdu .room_id )
246+ logger .debug ("min_depth: %d" , min_depth )
250247
251- if min_depth is not None and pdu .depth > min_depth :
252- # If we're missing stuff, ensure we only fetch stuff one
253- # at a time.
248+ if min_depth is not None and pdu .depth > min_depth :
249+ # If we're missing stuff, ensure we only fetch stuff one
250+ # at a time.
251+ logger .info (
252+ "Acquiring room lock to fetch %d missing prev_events: %s" ,
253+ len (missing_prevs ),
254+ shortstr (missing_prevs ),
255+ )
256+ with (await self ._room_pdu_linearizer .queue (pdu .room_id )):
254257 logger .info (
255- "Acquiring room lock to fetch %d missing prev_events: %s " ,
258+ "Acquired room lock to fetch %d missing prev_events" ,
256259 len (missing_prevs ),
257- shortstr (missing_prevs ),
258260 )
259- with ( await self . _room_pdu_linearizer . queue ( pdu . room_id )):
260- logger . info (
261- "Acquired room lock to fetch %d missing prev_events" ,
262- len ( missing_prevs ),
261+
262+ try :
263+ await self . _get_missing_events_for_pdu (
264+ origin , pdu , prevs , min_depth
263265 )
266+ except Exception as e :
267+ raise Exception (
268+ "Error fetching missing prev_events for %s: %s"
269+ % (event_id , e )
270+ ) from e
264271
265- try :
266- await self ._get_missing_events_for_pdu (
267- origin , pdu , prevs , min_depth
268- )
269- except Exception as e :
270- raise Exception (
271- "Error fetching missing prev_events for %s: %s"
272- % (event_id , e )
273- ) from e
274-
275- # Update the set of things we've seen after trying to
276- # fetch the missing stuff
277- seen = await self ._store .have_events_in_timeline (prevs )
278- missing_prevs = prevs - seen
279-
280- if not missing_prevs :
281- logger .info ("Found all missing prev_events" )
282-
283- if missing_prevs :
284- # since this event was pushed to us, it is possible for it to
285- # become the only forward-extremity in the room, and we would then
286- # trust its state to be the state for the whole room. This is very
287- # bad. Further, if the event was pushed to us, there is no excuse
288- # for us not to have all the prev_events. (XXX: apart from
289- # min_depth?)
290- #
291- # We therefore reject any such events.
292- logger .warning (
293- "Rejecting: failed to fetch %d prev events: %s" ,
294- len (missing_prevs ),
295- shortstr (missing_prevs ),
296- )
297- raise FederationError (
298- "ERROR" ,
299- 403 ,
300- (
301- "Your server isn't divulging details about prev_events "
302- "referenced in this event."
303- ),
304- affected = pdu .event_id ,
305- )
272+ # Update the set of things we've seen after trying to
273+ # fetch the missing stuff
274+ seen = await self ._store .have_events_in_timeline (prevs )
275+ missing_prevs = prevs - seen
276+
277+ if not missing_prevs :
278+ logger .info ("Found all missing prev_events" )
279+
280+ if missing_prevs :
281+ # since this event was pushed to us, it is possible for it to
282+ # become the only forward-extremity in the room, and we would then
283+ # trust its state to be the state for the whole room. This is very
284+ # bad. Further, if the event was pushed to us, there is no excuse
285+ # for us not to have all the prev_events. (XXX: apart from
286+ # min_depth?)
287+ #
288+ # We therefore reject any such events.
289+ logger .warning (
290+ "Rejecting: failed to fetch %d prev events: %s" ,
291+ len (missing_prevs ),
292+ shortstr (missing_prevs ),
293+ )
294+ raise FederationError (
295+ "ERROR" ,
296+ 403 ,
297+ (
298+ "Your server isn't divulging details about prev_events "
299+ "referenced in this event."
300+ ),
301+ affected = pdu .event_id ,
302+ )
306303
307304 await self ._process_received_pdu (origin , pdu , state = None )
308305
@@ -885,8 +882,13 @@ async def _process_received_pdu(
885882 state : Optional [Iterable [EventBase ]],
886883 backfilled : bool = False ,
887884 ) -> None :
888- """Called when we have a new pdu. We need to do auth checks and put it
889- through the StateHandler.
885+ """Called when we have a new non-outlier event.
886+
887+ This is called when we have a new event to add to the room DAG - either directly
888+ via a /send request, retrieved via get_missing_events after a /send request, or
889+ backfilled after a client request.
890+
891+ We need to do auth checks and put it through the StateHandler.
890892
891893 Args:
892894 origin: server sending the event
@@ -901,6 +903,7 @@ async def _process_received_pdu(
901903 notification to clients, and validation of device keys.)
902904 """
903905 logger .debug ("Processing event: %s" , event )
906+ assert not event .internal_metadata .outlier
904907
905908 try :
906909 context = await self ._state_handler .compute_event_context (
@@ -1263,11 +1266,13 @@ async def _auth_and_persist_event(
12631266 Possibly incomplete, and possibly including events that are not yet
12641267 persisted, or authed, or in the right room.
12651268
1266- Only populated where we may not already have persisted these events -
1267- for example, when populating outliers.
1269+ Only populated when populating outliers.
12681270
12691271 backfilled: True if the event was backfilled.
12701272 """
1273+ # claimed_auth_event_map should be given iff the event is an outlier
1274+ assert bool (claimed_auth_event_map ) == event .internal_metadata .outlier
1275+
12711276 context = await self ._check_event_auth (
12721277 origin ,
12731278 event ,
@@ -1306,15 +1311,16 @@ async def _check_event_auth(
13061311 Possibly incomplete, and possibly including events that are not yet
13071312 persisted, or authed, or in the right room.
13081313
1309- Only populated where we may not already have persisted these events -
1310- for example, when populating outliers, or the state for a backwards
1311- extremity.
1314+ Only populated when populating outliers.
13121315
13131316 backfilled: True if the event was backfilled.
13141317
13151318 Returns:
13161319 The updated context object.
13171320 """
1321+ # claimed_auth_event_map should be given iff the event is an outlier
1322+ assert bool (claimed_auth_event_map ) == event .internal_metadata .outlier
1323+
13181324 room_version = await self ._store .get_room_version_id (event .room_id )
13191325 room_version_obj = KNOWN_ROOM_VERSIONS [room_version ]
13201326
0 commit comments