@@ -279,6 +279,7 @@ def _persist_events(self, events_and_contexts, backfilled=False,
279
279
# We can't easily parallelize these since different chunks
280
280
# might contain the same event. :(
281
281
282
+ new_forward_extremeties = {}
282
283
current_state_for_room = {}
283
284
if not backfilled :
284
285
# Work out the new "current state" for each room.
@@ -296,20 +297,16 @@ def _persist_events(self, events_and_contexts, backfilled=False,
296
297
latest_event_ids = yield self .get_latest_event_ids_in_room (
297
298
room_id
298
299
)
299
- new_latest_event_ids = set (latest_event_ids )
300
- for event , ctx in ev_ctx_rm :
301
- if event .internal_metadata .is_outlier ():
302
- continue
303
-
304
- new_latest_event_ids .difference_update (
305
- e_id for e_id , _ in event .prev_events
306
- )
307
- new_latest_event_ids .add (event .event_id )
300
+ new_latest_event_ids = yield self ._calculate_new_extremeties (
301
+ room_id , [ev for ev , _ in ev_ctx_rm ]
302
+ )
308
303
309
304
if new_latest_event_ids == set (latest_event_ids ):
310
305
# No change in extremities, so no change in state
311
306
continue
312
307
308
+ new_forward_extremeties [room_id ] = new_latest_event_ids
309
+
313
310
# Now we need to work out the different state sets for
314
311
# each state extremities
315
312
state_sets = []
@@ -358,9 +355,45 @@ def _persist_events(self, events_and_contexts, backfilled=False,
358
355
backfilled = backfilled ,
359
356
delete_existing = delete_existing ,
360
357
current_state_for_room = current_state_for_room ,
358
+ new_forward_extremeties = new_forward_extremeties ,
361
359
)
362
360
persist_event_counter .inc_by (len (chunk ))
363
361
362
+ @defer .inlineCallbacks
363
+ def _calculate_new_extremeties (self , room_id , events ):
364
+ latest_event_ids = yield self .get_latest_event_ids_in_room (
365
+ room_id
366
+ )
367
+ new_latest_event_ids = set (latest_event_ids )
368
+ new_latest_event_ids .update (
369
+ event .event_id for event in events
370
+ if not event .internal_metadata .is_outlier ()
371
+ )
372
+ new_latest_event_ids .difference_update (
373
+ e_id
374
+ for event in events
375
+ for e_id , _ in event .prev_events
376
+ if not event .internal_metadata .is_outlier ()
377
+ )
378
+
379
+ rows = yield self ._simple_select_many_batch (
380
+ table = "event_edges" ,
381
+ column = "prev_event_id" ,
382
+ iterable = list (new_latest_event_ids ),
383
+ retcols = ["prev_event_id" ],
384
+ keyvalues = {
385
+ "room_id" : room_id ,
386
+ "is_state" : False ,
387
+ },
388
+ desc = "_calculate_new_extremeties" ,
389
+ )
390
+
391
+ new_latest_event_ids .difference_update (
392
+ row ["prev_event_id" ] for row in rows
393
+ )
394
+
395
+ defer .returnValue (new_latest_event_ids )
396
+
364
397
@defer .inlineCallbacks
365
398
def get_event (self , event_id , check_redacted = True ,
366
399
get_prev_content = False , allow_rejected = False ,
@@ -417,53 +450,10 @@ def get_events(self, event_ids, check_redacted=True,
417
450
418
451
defer .returnValue ({e .event_id : e for e in events })
419
452
420
- @log_function
421
- def _persist_event_txn (self , txn , event , context , current_state , backfilled = False ,
422
- delete_existing = False ):
423
- # We purposefully do this first since if we include a `current_state`
424
- # key, we *want* to update the `current_state_events` table
425
- if current_state :
426
- txn .call_after (self ._get_current_state_for_key .invalidate_all )
427
- txn .call_after (self .get_rooms_for_user .invalidate_all )
428
- txn .call_after (self .get_users_in_room .invalidate , (event .room_id ,))
429
-
430
- # Add an entry to the current_state_resets table to record the point
431
- # where we clobbered the current state
432
- stream_order = event .internal_metadata .stream_ordering
433
- self ._simple_insert_txn (
434
- txn ,
435
- table = "current_state_resets" ,
436
- values = {"event_stream_ordering" : stream_order }
437
- )
438
-
439
- self ._simple_delete_txn (
440
- txn ,
441
- table = "current_state_events" ,
442
- keyvalues = {"room_id" : event .room_id },
443
- )
444
-
445
- for s in current_state :
446
- self ._simple_insert_txn (
447
- txn ,
448
- "current_state_events" ,
449
- {
450
- "event_id" : s .event_id ,
451
- "room_id" : s .room_id ,
452
- "type" : s .type ,
453
- "state_key" : s .state_key ,
454
- }
455
- )
456
-
457
- return self ._persist_events_txn (
458
- txn ,
459
- [(event , context )],
460
- backfilled = backfilled ,
461
- delete_existing = delete_existing ,
462
- )
463
-
464
453
@log_function
465
454
def _persist_events_txn (self , txn , events_and_contexts , backfilled ,
466
- delete_existing = False , current_state_for_room = {}):
455
+ delete_existing = False , current_state_for_room = {},
456
+ new_forward_extremeties = {}):
467
457
"""Insert some number of room events into the necessary database tables.
468
458
469
459
Rejected events are only inserted into the events table, the events_json table,
@@ -473,18 +463,18 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
473
463
If delete_existing is True then existing events will be purged from the
474
464
database before insertion. This is useful when retrying due to IntegrityError.
475
465
"""
466
+ max_stream_order = events_and_contexts [- 1 ][0 ].internal_metadata .stream_ordering
476
467
for room_id , current_state in current_state_for_room .iteritems ():
477
468
txn .call_after (self ._get_current_state_for_key .invalidate_all )
478
469
txn .call_after (self .get_rooms_for_user .invalidate_all )
479
470
txn .call_after (self .get_users_in_room .invalidate , (room_id ,))
480
471
481
472
# Add an entry to the current_state_resets table to record the point
482
473
# where we clobbered the current state
483
- stream_order = events_and_contexts [- 1 ][0 ].internal_metadata .stream_ordering
484
474
self ._simple_insert_txn (
485
475
txn ,
486
476
table = "current_state_resets" ,
487
- values = {"event_stream_ordering" : stream_order }
477
+ values = {"event_stream_ordering" : max_stream_order }
488
478
)
489
479
490
480
self ._simple_delete_txn (
@@ -507,6 +497,46 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
507
497
],
508
498
)
509
499
500
+ for room_id , new_extrem in new_forward_extremeties .items ():
501
+ self ._simple_delete_txn (
502
+ txn ,
503
+ table = "event_forward_extremities" ,
504
+ keyvalues = {"room_id" : room_id },
505
+ )
506
+ txn .call_after (
507
+ self .get_latest_event_ids_in_room .invalidate , (room_id ,)
508
+ )
509
+
510
+ self ._simple_insert_many_txn (
511
+ txn ,
512
+ table = "event_forward_extremities" ,
513
+ values = [
514
+ {
515
+ "event_id" : ev_id ,
516
+ "room_id" : room_id ,
517
+ }
518
+ for room_id , new_extrem in new_forward_extremeties .items ()
519
+ for ev_id in new_extrem
520
+ ],
521
+ )
522
+ # We now insert into stream_ordering_to_exterm a mapping from room_id,
523
+ # new stream_ordering to new forward extremeties in the room.
524
+ # This allows us to later efficiently look up the forward extremeties
525
+ # for a room before a given stream_ordering
526
+ self ._simple_insert_many_txn (
527
+ txn ,
528
+ table = "stream_ordering_to_exterm" ,
529
+ values = [
530
+ {
531
+ "room_id" : room_id ,
532
+ "event_id" : event_id ,
533
+ "stream_ordering" : max_stream_order ,
534
+ }
535
+ for room_id , new_extrem in new_forward_extremeties .items ()
536
+ for event_id in new_extrem
537
+ ]
538
+ )
539
+
510
540
# Ensure that we don't have the same event twice.
511
541
# Pick the earliest non-outlier if there is one, else the earliest one.
512
542
new_events_and_contexts = OrderedDict ()
0 commit comments