Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use AsyncLruCache for the get events cache
Browse files Browse the repository at this point in the history
This doesn't change function of the cache at all but prepares for the
possibility of external backed, or combination external/local backed,
event cache.

This does separate the cache invalidation into two options: the local
in memory cache and any asynchronous cache (which is currently the
same thing). This means that the worker invalidating objects on changes
must call the async method and the local method is reserved for
invalidations received over replication.
  • Loading branch information
Fizzadar committed Jul 11, 2022
1 parent 8cf8025 commit 680f931
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
self._invalidate_get_event_cache(event_id)
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))
Expand All @@ -208,7 +208,7 @@ def _invalidate_caches_for_event(
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)

if redacts:
self._invalidate_get_event_cache(redacts)
self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1669,9 +1669,9 @@ def _add_to_cache(
if not row["rejects"] and not row["redacts"]:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))

def prefill() -> None:
async def prefill() -> None:
for cache_entry in to_prefill:
self.store._get_event_cache.set(
await self.store._get_event_cache.set(
(cache_entry.event.event_id,), cache_entry
)

Expand Down
28 changes: 18 additions & 10 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.lrucache import AsyncLruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -238,7 +238,9 @@ def __init__(
5 * 60 * 1000,
)

self._get_event_cache: LruCache[Tuple[str], EventCacheEntry] = LruCache(
self._get_event_cache: AsyncLruCache[
Tuple[str], EventCacheEntry
] = AsyncLruCache(
cache_name="*getEvent*",
max_size=hs.config.caches.event_cache_size,
)
Expand Down Expand Up @@ -617,7 +619,7 @@ async def _get_events_from_cache_or_db(
Returns:
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_entry_map = await self._get_events_from_cache(
event_ids,
)

Expand Down Expand Up @@ -729,12 +731,16 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:

return event_entry_map

def _invalidate_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.invalidate((event_id,))
async def _invalidate_get_event_cache(self, event_id: str) -> None:
await self._get_event_cache.invalidate((event_id,))
self._invalidate_local_get_event_cache(event_id)

def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.lru_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the caches.
Expand All @@ -749,7 +755,7 @@ def _get_events_from_cache(

for event_id in events:
# First check if it's in the event cache
ret = self._get_event_cache.get(
ret = await self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if ret:
Expand All @@ -771,7 +777,7 @@ def _get_events_from_cache(

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)
await self._get_event_cache.set((event_id,), cache_entry)

return event_map

Expand Down Expand Up @@ -1148,7 +1154,7 @@ async def _get_events_from_db(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
await self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

if not redacted_event:
Expand Down Expand Up @@ -1382,7 +1388,9 @@ async def _have_seen_events_dict(
# if the event cache contains the event, obviously we've seen it.

cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
(rid, eid)
for (rid, eid) in keys
if await self._get_event_cache.contains((eid,))
}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def _purge_history_txn(
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
self._invalidate_get_event_cache(event_id)
txn.call_after(self._invalidate_get_event_cache, event_id)

logger.info("[purge] done")

Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,9 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
event_map = await self._get_events_from_cache(
member_event_ids, update_metrics=False
)

missing_member_event_ids = []
for event_id in member_event_ids:
Expand Down
2 changes: 1 addition & 1 deletion tests/storage/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ def test_purge_room(self):
)

# The events aren't found.
self.store._invalidate_get_event_cache(create_event.event_id)
self.store._invalidate_local_get_event_cache(create_event.event_id)
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)

0 comments on commit 680f931

Please sign in to comment.