-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Remove old rows from the cache_invalidation_stream_by_instance table automatically. (This table is not used when Synapse is configured to use SQLite.)
#15868
Changes from all commits
6f3b76d
f50c800
19418ec
383b7c9
3ca3d1e
fe7c6a9
eb28389
53e7030
2bef4ef
8ece6c0
71fcc0e
ca52419
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
| from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple | ||
|
|
||
| from synapse.api.constants import EventTypes | ||
| from synapse.config._base import Config | ||
| from synapse.metrics.background_process_metrics import wrap_as_background_process | ||
| from synapse.replication.tcp.streams import BackfillStream, CachesStream | ||
| from synapse.replication.tcp.streams.events import ( | ||
| EventsStream, | ||
|
|
@@ -52,6 +54,21 @@ | |
| # As above, but for invalidating room caches on room deletion | ||
| DELETE_ROOM_CACHE_NAME = "dr_cache_fake" | ||
|
|
||
| # How long between cache invalidation table cleanups, once we have caught up | ||
| # with the backlog. | ||
| REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") | ||
|
|
||
| # How long between cache invalidation table cleanups, before we have caught | ||
| # up with the backlog. | ||
| CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") | ||
|
|
||
| # Maximum number of cache invalidation rows to delete at once. | ||
| CLEAN_UP_MAX_BATCH_SIZE = 20_000 | ||
|
|
||
| # Keep cache invalidations for 7 days | ||
| # (This is likely to be quite excessive.) | ||
| RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") | ||
|
|
||
|
|
||
| class CacheInvalidationWorkerStore(SQLBaseStore): | ||
| def __init__( | ||
|
|
@@ -98,6 +115,18 @@ def __init__( | |
| else: | ||
| self._cache_id_gen = None | ||
|
|
||
| # Occasionally clean up the cache invalidations stream table by deleting | ||
| # old rows. | ||
| # This is only applicable when Postgres is in use; this table is unused | ||
| # and not populated at all when SQLite is the active database engine. | ||
| if hs.config.worker.run_background_tasks and isinstance( | ||
| self.database_engine, PostgresEngine | ||
| ): | ||
| self.hs.get_clock().call_later( | ||
| CATCH_UP_CLEANUP_INTERVAL_MS / 1000, | ||
| self._clean_up_cache_invalidation_wrapper, | ||
| ) | ||
|
|
||
| async def get_all_updated_caches( | ||
| self, instance_name: str, last_id: int, current_id: int, limit: int | ||
| ) -> Tuple[List[Tuple[int, tuple]], int, bool]: | ||
|
|
@@ -554,3 +583,104 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: | |
| return self._cache_id_gen.get_current_token_for_writer(instance_name) | ||
| else: | ||
| return 0 | ||
|
|
||
| @wrap_as_background_process("clean_up_old_cache_invalidations") | ||
| async def _clean_up_cache_invalidation_wrapper(self) -> None: | ||
| """ | ||
| Clean up cache invalidation stream table entries occasionally. | ||
| If we are behind (i.e. there are entries old enough to | ||
| be deleted but too many of them to be deleted in one go), | ||
| then we run slightly more frequently. | ||
| """ | ||
| delete_up_to: int = ( | ||
| self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS | ||
| ) | ||
|
|
||
| in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) | ||
|
|
||
| # Vary how long we wait before calling again depending on whether we | ||
| # are still sifting through backlog or we have caught up. | ||
| if in_backlog: | ||
| next_interval = CATCH_UP_CLEANUP_INTERVAL_MS | ||
| else: | ||
| next_interval = REGULAR_CLEANUP_INTERVAL_MS | ||
|
|
||
| self.hs.get_clock().call_later( | ||
| next_interval / 1000, self._clean_up_cache_invalidation_wrapper | ||
| ) | ||
|
|
||
| async def _clean_up_batch_of_old_cache_invalidations( | ||
| self, delete_up_to_millisec: int | ||
| ) -> bool: | ||
| """ | ||
| Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). | ||
|
|
||
| Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. | ||
|
|
||
| Returns true if and only if we were limited by batch size (i.e. we are in backlog: | ||
| there are more things to clean up). | ||
| """ | ||
|
|
||
| def _clean_up_batch_of_old_cache_invalidations_txn( | ||
| txn: LoggingTransaction, | ||
| ) -> bool: | ||
| # First get the earliest stream ID | ||
| txn.execute( | ||
| """ | ||
| SELECT stream_id FROM cache_invalidation_stream_by_instance | ||
| ORDER BY stream_id ASC | ||
| LIMIT 1 | ||
| """ | ||
| ) | ||
| row = txn.fetchone() | ||
| if row is None: | ||
| return False | ||
| earliest_stream_id: int = row[0] | ||
|
|
||
| # Then find the last stream ID of the range we will delete | ||
| txn.execute( | ||
| """ | ||
| SELECT stream_id FROM cache_invalidation_stream_by_instance | ||
| WHERE stream_id <= ? AND invalidation_ts <= ? | ||
| ORDER BY stream_id DESC | ||
| LIMIT 1 | ||
| """, | ||
| (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), | ||
| ) | ||
| row = txn.fetchone() | ||
| if row is None: | ||
| return False | ||
| cutoff_stream_id: int = row[0] | ||
|
|
||
| # Determine whether we are caught up or still catching up | ||
| txn.execute( | ||
| """ | ||
| SELECT invalidation_ts FROM cache_invalidation_stream_by_instance | ||
| WHERE stream_id > ? | ||
| ORDER BY stream_id ASC | ||
| LIMIT 1 | ||
| """, | ||
| (cutoff_stream_id,), | ||
| ) | ||
| row = txn.fetchone() | ||
| if row is None: | ||
| in_backlog = False | ||
| else: | ||
| # We are in backlog if the next row could have been deleted | ||
| # if we didn't have such a small batch size | ||
| in_backlog = row[0] <= delete_up_to_millisec | ||
|
Comment on lines
+655
to
+671
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this makes sense, but I probably would have queried for something like i.e. to see if the complement of rows covered by the previous query is nonempty.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I genuinely do only want to look at the first row after our current marker: adding more conditions like you suggest is either going to cause more work for the DB as it'd have to scan all the remaining rows until finding one (I think?) or in the case of the Have to be honest I don't quite follow your suggestion's query, at first I was thinking you wanted
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first I thought you were deleting things based on both stream_id and invalidation_ts. If so, it seemed odd to not take both into account when working out if had deleted everything that you wanted to. I see now that you're only deleting things based on stream_id, using the invalidation_ts only to select a sensible upper limit on stream_id. Paranoid note: it is not necessarily the case that x.stream_id > y.stream_id implies x.invalidation_ts > y.invalidation_ts (consider e.g. one worker is slow to complete transactions or has a laggy clock). I wouldn't expect this to have any meaingful effect in practice because any drifts should be small. |
||
|
|
||
| txn.execute( | ||
| """ | ||
| DELETE FROM cache_invalidation_stream_by_instance | ||
| WHERE ? <= stream_id AND stream_id <= ? | ||
| """, | ||
| (earliest_stream_id, cutoff_stream_id), | ||
| ) | ||
|
Comment on lines
+655
to
+679
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we could easily determine the outcome of the deletion. There is this to see how many rows were deleted if we're interested in that boilerplate. (not saying we have to use it as the current way is simple, just cumbersome).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered checking the count of deleted rows but these streams can technically have gaps so you don't actually know how many rows are to be deleted, only the upper bound.
|
||
|
|
||
| return in_backlog | ||
|
|
||
| return await self.db_pool.runInteraction( | ||
| "clean_up_old_cache_invalidations", | ||
| _clean_up_batch_of_old_cache_invalidations_txn, | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.