Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Commit

Permalink
Async get event cache prep (matrix-org#13242)
Browse files Browse the repository at this point in the history
Some experimental prep work to enable external event caching based on matrix-org#9379 & matrix-org#12955. Doesn't actually move the cache at all, just lays the groundwork for async implemented caches.

Signed off by Nick @ Beeper (@Fizzadar)
  • Loading branch information
Fizzadar committed Jul 27, 2022
1 parent f891932 commit 4d5d02d
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 20 deletions.
6 changes: 4 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,10 @@ async def _runInteraction() -> R:

return cast(R, result)
except Exception:
for after_callback, after_args, after_kwargs in exception_callbacks:
after_callback(*after_args, **after_kwargs)
for exception_callback, after_args, after_kwargs in exception_callbacks:
await maybe_awaitable(
exception_callback(*after_args, **after_kwargs)
)
raise

# To handle cancellation, we ensure that `after_callback`s and
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,17 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
return event_entry_map

async def _invalidate_get_event_cache(self, event_id: str) -> None:
# First we invalidate the asynchronous cache instance. This may include
# out-of-process caches such as Redis/memcache. Once complete we can
# invalidate any in memory cache. The ordering is important here to
# ensure we don't pull in any remote invalid value after we invalidate
# the in-memory cache.
await self._get_event_cache.invalidate((event_id,))
self._invalidate_local_get_event_cache(event_id)
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.lru_cache.invalidate((event_id,))
self._get_event_cache.invalidate_local((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

Expand Down
30 changes: 20 additions & 10 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,27 +734,37 @@ def __del__(self) -> None:

class AsyncLruCache(Generic[KT, VT]):
"""
An asynchronous wrapper around a subset of the LruCache API. On it's own
this doesn't change the behaviour but allows subclasses that utilize
external cache systems that require await behaviour to be created.
An asynchronous wrapper around a subset of the LruCache API.
On its own this doesn't change the behaviour but allows subclasses that
utilize external cache systems that require await behaviour to be created.
"""

def __init__(self, *args, **kwargs): # type: ignore
self.lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)

async def get(
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
) -> Optional[VT]:
return self.lru_cache.get(key, update_metrics=update_metrics)
return self._lru_cache.get(key, update_metrics=update_metrics)

async def set(self, key: KT, value: VT) -> None:
self.lru_cache.set(key, value)
self._lru_cache.set(key, value)

async def invalidate(self, key: KT) -> None:
return self.lru_cache.invalidate(key)
# This method should invalidate any external cache and then invalidate the LruCache.
return self._lru_cache.invalidate(key)

def invalidate_local(self, key: KT) -> None:
"""Remove an entry from the local cache
This variant of `invalidate` is useful if we know that the external
cache has already been invalidated.
"""
return self._lru_cache.invalidate(key)

async def contains(self, key: KT) -> bool:
return self.lru_cache.contains(key)
return self._lru_cache.contains(key)

def clear(self) -> None:
self.lru_cache.clear()
async def clear(self) -> None:
self._lru_cache.clear()
4 changes: 3 additions & 1 deletion synapse/util/caches/redis_caches.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any, Generic, Optional, Union, TYPE_CHECKING
from functools import wraps
from typing import TYPE_CHECKING, Any, Generic, Optional, Union

from synapse.util.caches.lrucache import KT, VT, AsyncLruCache, T

Expand All @@ -21,7 +21,9 @@ async def _wrapped(**kwargs):

values.update(missing_values)
return values

return _wrapped

return decorator


Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_unknown_room_version(self):

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
self.store._event_ref.clear()

# The rooms should be excluded from the sync response.
Expand Down
8 changes: 4 additions & 4 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def prepare(self, reactor, clock, hs):
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

def test_simple(self):
"""Test that we cache events that we pull from the DB."""
Expand All @@ -160,7 +160,7 @@ def test_event_ref(self):
"""

# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
Expand All @@ -170,7 +170,7 @@ def test_event_ref(self):
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
Expand Down Expand Up @@ -345,7 +345,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

@contextmanager
def blocking_get_event_calls(
Expand Down

0 comments on commit 4d5d02d

Please sign in to comment.