@@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
599599 DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
600600 REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
601601 REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
602+ REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
602603
603604 def __init__ (self , database : DatabasePool , db_conn , hs : "HomeServer" ):
604605 super ().__init__ (database , db_conn , hs )
@@ -614,14 +615,18 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
614615 self .DEVICE_INBOX_STREAM_ID , self ._background_drop_index_device_inbox
615616 )
616617
617- self .db_pool .updates .register_background_update_handler (
618- self .REMOVE_DELETED_DEVICES ,
619- self ._remove_deleted_devices_from_device_inbox ,
618+ # Used to be a background update that deletes all device_inboxes for deleted
619+ # devices.
620+ self .db_pool .updates .register_noop_background_update (
621+ self .REMOVE_DELETED_DEVICES
620622 )
623+ # Used to be a background update that deletes all device_inboxes for hidden
624+ # devices.
625+ self .db_pool .updates .register_noop_background_update (self .REMOVE_HIDDEN_DEVICES )
621626
622627 self .db_pool .updates .register_background_update_handler (
623- self .REMOVE_HIDDEN_DEVICES ,
624- self ._remove_hidden_devices_from_device_inbox ,
628+ self .REMOVE_DEAD_DEVICES_FROM_INBOX ,
629+ self ._remove_dead_devices_from_device_inbox ,
625630 )
626631
627632 async def _background_drop_index_device_inbox (self , progress , batch_size ):
@@ -636,171 +641,83 @@ def reindex_txn(conn):
636641
637642 return 1
638643
639- async def _remove_deleted_devices_from_device_inbox (
640- self , progress : JsonDict , batch_size : int
644+ async def _remove_dead_devices_from_device_inbox (
645+ self ,
646+ progress : JsonDict ,
647+ batch_size : int ,
641648 ) -> int :
642- """A background update that deletes all device_inboxes for deleted devices.
643-
644- This should only need to be run once (when users upgrade to v1.47.0)
649+ """A background update to remove devices that were either deleted or hidden from
650+ the device_inbox table.
645651
646652 Args:
647- progress: JsonDict used to store progress of this background update
648- batch_size: the maximum number of rows to retrieve in a single select query
653+ progress: The update's progress dict.
654+ batch_size: The batch size for this update.
649655
650656 Returns:
651- The number of deleted rows
657+ The number of rows deleted.
652658 """
653659
654- def _remove_deleted_devices_from_device_inbox_txn (
660+ def _remove_dead_devices_from_device_inbox_txn (
655661 txn : LoggingTransaction ,
656- ) -> int :
657- """stream_id is not unique
658- we need to use an inclusive `stream_id >= ?` clause,
659- since we might not have deleted all dead device messages for the stream_id
660- returned from the previous query
662+ ) -> Tuple [int , bool ]:
661663
662- Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
663- to avoid problems of deleting a large number of rows all at once
664- due to a single device having lots of device messages.
665- """
664+ if "max_stream_id" in progress :
665+ max_stream_id = progress ["max_stream_id" ]
666+ else :
667+ txn .execute ("SELECT max(stream_id) FROM device_inbox" )
668+ # There's a type mismatch here between how we want to type the row and
669+ # what fetchone says it returns, but we silence it because we know that
670+ # res can't be None.
671+ res : Tuple [Optional [int ]] = txn .fetchone () # type: ignore[assignment]
672+ if res [0 ] is None :
673+ # this can only happen if the `device_inbox` table is empty, in which
674+ # case we have no work to do.
675+ return 0 , True
676+ else :
677+ max_stream_id = res [0 ]
666678
667- last_stream_id = progress .get ("stream_id" , 0 )
679+ start = progress .get ("stream_id" , 0 )
680+ stop = start + batch_size
668681
682+ # delete rows in `device_inbox` which do *not* correspond to a known,
683+ # unhidden device.
669684 sql = """
670- SELECT device_id, user_id, stream_id
671- FROM device_inbox
685+ DELETE FROM device_inbox
672686 WHERE
673- stream_id >= ?
674- AND (device_id, user_id) NOT IN (
675- SELECT device_id, user_id FROM devices
687+ stream_id >= ? AND stream_id < ?
688+ AND NOT EXISTS (
689+ SELECT * FROM devices d
690+ WHERE
691+ d.device_id=device_inbox.device_id
692+ AND d.user_id=device_inbox.user_id
693+ AND NOT hidden
676694 )
677- ORDER BY stream_id
678- LIMIT ?
679- """
680-
681- txn .execute (sql , (last_stream_id , batch_size ))
682- rows = txn .fetchall ()
695+ """
683696
684- num_deleted = 0
685- for row in rows :
686- num_deleted += self .db_pool .simple_delete_txn (
687- txn ,
688- "device_inbox" ,
689- {"device_id" : row [0 ], "user_id" : row [1 ], "stream_id" : row [2 ]},
690- )
697+ txn .execute (sql , (start , stop ))
691698
692- if rows :
693- # send more than stream_id to progress
694- # otherwise it can happen in large deployments that
695- # no change of status is visible in the log file
696- # it may be that the stream_id does not change in several runs
697- self .db_pool .updates ._background_update_progress_txn (
698- txn ,
699- self .REMOVE_DELETED_DEVICES ,
700- {
701- "device_id" : rows [- 1 ][0 ],
702- "user_id" : rows [- 1 ][1 ],
703- "stream_id" : rows [- 1 ][2 ],
704- },
705- )
706-
707- return num_deleted
708-
709- number_deleted = await self .db_pool .runInteraction (
710- "_remove_deleted_devices_from_device_inbox" ,
711- _remove_deleted_devices_from_device_inbox_txn ,
712- )
713-
714- # The task is finished when no more lines are deleted.
715- if not number_deleted :
716- await self .db_pool .updates ._end_background_update (
717- self .REMOVE_DELETED_DEVICES
699+ self .db_pool .updates ._background_update_progress_txn (
700+ txn ,
701+ self .REMOVE_DEAD_DEVICES_FROM_INBOX ,
702+ {
703+ "stream_id" : stop ,
704+ "max_stream_id" : max_stream_id ,
705+ },
718706 )
719707
720- return number_deleted
721-
722- async def _remove_hidden_devices_from_device_inbox (
723- self , progress : JsonDict , batch_size : int
724- ) -> int :
725- """A background update that deletes all device_inboxes for hidden devices.
726-
727- This should only need to be run once (when users upgrade to v1.47.0)
728-
729- Args:
730- progress: JsonDict used to store progress of this background update
731- batch_size: the maximum number of rows to retrieve in a single select query
732-
733- Returns:
734- The number of deleted rows
735- """
736-
737- def _remove_hidden_devices_from_device_inbox_txn (
738- txn : LoggingTransaction ,
739- ) -> int :
740- """stream_id is not unique
741- we need to use an inclusive `stream_id >= ?` clause,
742- since we might not have deleted all hidden device messages for the stream_id
743- returned from the previous query
744-
745- Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
746- to avoid problems of deleting a large number of rows all at once
747- due to a single device having lots of device messages.
748- """
749-
750- last_stream_id = progress .get ("stream_id" , 0 )
751-
752- sql = """
753- SELECT device_id, user_id, stream_id
754- FROM device_inbox
755- WHERE
756- stream_id >= ?
757- AND (device_id, user_id) IN (
758- SELECT device_id, user_id FROM devices WHERE hidden = ?
759- )
760- ORDER BY stream_id
761- LIMIT ?
762- """
763-
764- txn .execute (sql , (last_stream_id , True , batch_size ))
765- rows = txn .fetchall ()
766-
767- num_deleted = 0
768- for row in rows :
769- num_deleted += self .db_pool .simple_delete_txn (
770- txn ,
771- "device_inbox" ,
772- {"device_id" : row [0 ], "user_id" : row [1 ], "stream_id" : row [2 ]},
773- )
774-
775- if rows :
776- # We don't just save the `stream_id` in progress as
777- # otherwise it can happen in large deployments that
778- # no change of status is visible in the log file, as
779- # it may be that the stream_id does not change in several runs
780- self .db_pool .updates ._background_update_progress_txn (
781- txn ,
782- self .REMOVE_HIDDEN_DEVICES ,
783- {
784- "device_id" : rows [- 1 ][0 ],
785- "user_id" : rows [- 1 ][1 ],
786- "stream_id" : rows [- 1 ][2 ],
787- },
788- )
789-
790- return num_deleted
708+ return stop > max_stream_id
791709
792- number_deleted = await self .db_pool .runInteraction (
793- "_remove_hidden_devices_from_device_inbox " ,
794- _remove_hidden_devices_from_device_inbox_txn ,
710+ finished = await self .db_pool .runInteraction (
711+ "_remove_devices_from_device_inbox_txn " ,
712+ _remove_dead_devices_from_device_inbox_txn ,
795713 )
796714
797- # The task is finished when no more lines are deleted.
798- if not number_deleted :
715+ if finished :
799716 await self .db_pool .updates ._end_background_update (
800- self .REMOVE_HIDDEN_DEVICES
717+ self .REMOVE_DEAD_DEVICES_FROM_INBOX ,
801718 )
802719
803- return number_deleted
720+ return batch_size
804721
805722
806723class DeviceInboxStore (DeviceInboxWorkerStore , DeviceInboxBackgroundUpdateStore ):
0 commit comments