Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make deleting stale pushers a background update #9536

Merged
merged 2 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9536.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deleting pushers when using sharded pushers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the same as the newsfile for the PR that introduced the delta file? if not, please could it be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, sorry should have linked to it in the PR description: #9479

52 changes: 52 additions & 0 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"
self._remove_deactivated_pushers,
)

self.db_pool.updates.register_background_update_handler(
"remove_stale_pushers",
self._remove_stale_pushers,
)

def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table

Expand Down Expand Up @@ -337,6 +342,53 @@ def _delete_pushers(txn) -> int:

return number_deleted

async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
"""A background update that deletes all pushers for logged out devices.

Note that we don't proacively tell the pusherpool that we've deleted
these (just because its a bit off a faff to do from here), but they will
get cleaned up at the next restart
"""

last_pusher = progress.get("last_pusher", 0)

def _delete_pushers(txn) -> int:

sql = """
SELECT p.id, access_token FROM pushers AS p
LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
WHERE p.id > ?
ORDER BY p.id ASC
LIMIT ?
"""

txn.execute(sql, (last_pusher, batch_size))
pushers = [(row[0], row[1]) for row in txn]

self.db_pool.simple_delete_many_txn(
txn,
table="pushers",
column="id",
iterable=(pusher_id for pusher_id, token in pushers if token is None),
keyvalues={},
)

if pushers:
self.db_pool.updates._background_update_progress_txn(
txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
)

return len(pushers)

number_deleted = await self.db_pool.runInteraction(
"_remove_stale_pushers", _delete_pushers
)

if number_deleted < batch_size:
await self.db_pool.updates._end_background_update("remove_stale_pushers")

return number_deleted


class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@

-- Delete all pushers associated with deleted devices. This is to clear up after
-- a bug where they weren't correctly deleted when using workers.
DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5908, 'remove_stale_pushers', '{}');