|
19 | 19 |
|
20 | 20 | from synapse.api.constants import EventTypes
|
21 | 21 | from synapse.config._base import Config
|
22 |
| -from synapse.metrics.background_process_metrics import run_as_background_process |
| 22 | +from synapse.metrics.background_process_metrics import ( |
| 23 | + run_as_background_process, |
| 24 | + wrap_as_background_process, |
| 25 | +) |
23 | 26 | from synapse.replication.tcp.streams import BackfillStream, CachesStream
|
24 | 27 | from synapse.replication.tcp.streams.events import (
|
25 | 28 | EventsStream,
|
@@ -584,36 +587,29 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
|
584 | 587 | else:
|
585 | 588 | return 0
|
586 | 589 |
|
587 |
| - def _clean_up_cache_invalidation_wrapper(self) -> None: |
588 |
| - async def _clean_up_cache_invalidation_background() -> None: |
589 |
| - """ |
590 |
| - Clean up cache invalidation stream table entries occasionally. |
591 |
| - If we are behind (i.e. there are entries old enough to |
592 |
| - be deleted but too many of them to be deleted in one go), |
593 |
| - then we run slightly more frequently. |
594 |
| - """ |
595 |
| - delete_up_to: int = ( |
596 |
| - self.hs.get_clock().time_msec() |
597 |
| - - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS |
598 |
| - ) |
599 |
| - |
600 |
| - in_backlog = await self._clean_up_batch_of_old_cache_invalidations( |
601 |
| - delete_up_to |
602 |
| - ) |
| 590 | + @wrap_as_background_process("clean_up_old_cache_invalidations") |
| 591 | + async def _clean_up_cache_invalidation_wrapper(self) -> None: |
| 592 | + """ |
| 593 | + Clean up cache invalidation stream table entries occasionally. |
| 594 | + If we are behind (i.e. there are entries old enough to |
| 595 | + be deleted but too many of them to be deleted in one go), |
| 596 | + then we run slightly more frequently. |
| 597 | + """ |
| 598 | + delete_up_to: int = ( |
| 599 | + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS |
| 600 | + ) |
603 | 601 |
|
604 |
| - # Vary how long we wait before calling again depending on whether we |
605 |
| - # are still sifting through backlog or we have caught up. |
606 |
| - if in_backlog: |
607 |
| - next_interval = CATCH_UP_CLEANUP_INTERVAL_MS |
608 |
| - else: |
609 |
| - next_interval = REGULAR_CLEANUP_INTERVAL_MS |
| 602 | + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) |
610 | 603 |
|
611 |
| - self.hs.get_clock().call_later( |
612 |
| - next_interval / 1000, self._clean_up_cache_invalidation_wrapper |
613 |
| - ) |
| 604 | + # Vary how long we wait before calling again depending on whether we |
| 605 | + # are still sifting through backlog or we have caught up. |
| 606 | + if in_backlog: |
| 607 | + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS |
| 608 | + else: |
| 609 | + next_interval = REGULAR_CLEANUP_INTERVAL_MS |
614 | 610 |
|
615 |
| - run_as_background_process( |
616 |
| - "clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background |
| 611 | + self.hs.get_clock().call_later( |
| 612 | + next_interval / 1000, self._clean_up_cache_invalidation_wrapper |
617 | 613 | )
|
618 | 614 |
|
619 | 615 | async def _clean_up_batch_of_old_cache_invalidations(
|
|
0 commit comments