Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Async get event cache prep #13242

Merged
merged 15 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,10 @@ async def _runInteraction() -> R:

return cast(R, result)
except Exception:
for after_callback, after_args, after_kwargs in exception_callbacks:
after_callback(*after_args, **after_kwargs)
for exception_callback, after_args, after_kwargs in exception_callbacks:
await maybe_awaitable(
exception_callback(*after_args, **after_kwargs)
)
raise

# To handle cancellation, we ensure that `after_callback`s and
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
richvdh marked this conversation as resolved.
Show resolved Hide resolved
self.have_seen_event.invalidate((room_id, event_id))

Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,11 +732,17 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
return event_entry_map

async def _invalidate_get_event_cache(self, event_id: str) -> None:
# First we invalidate the asynchronous cache instance, this may include
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# out of process caches such as Redis/memcache. Once complete we can
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# invalidate any in memory cache. The ordering is important here to
# ensure we don't pull in any remote invalid value after we invalidate
# the in-memory cache.
await self._get_event_cache.invalidate((event_id,))
self._invalidate_local_get_event_cache(event_id)
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.lru_cache.invalidate((event_id,))
self._get_event_cache.invalidate_local((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

Expand Down
25 changes: 15 additions & 10 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,27 +734,32 @@ def __del__(self) -> None:

class AsyncLruCache(Generic[KT, VT]):
"""
An asynchronous wrapper around a subset of the LruCache API. On it's own
this doesn't change the behaviour but allows subclasses that utilize
external cache systems that require await behaviour to be created.
An asynchronous wrapper around a subset of the LruCache API.

On it's own this doesn't change the behaviour but allows subclasses that
richvdh marked this conversation as resolved.
Show resolved Hide resolved
utilize external cache systems that require await behaviour to be created.
"""

def __init__(self, *args, **kwargs): # type: ignore
self.lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)

async def get(
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
) -> Optional[VT]:
return self.lru_cache.get(key, update_metrics=update_metrics)
return self._lru_cache.get(key, update_metrics=update_metrics)

async def set(self, key: KT, value: VT) -> None:
self.lru_cache.set(key, value)
self._lru_cache.set(key, value)

async def invalidate(self, key: KT) -> None:
return self.lru_cache.invalidate(key)
# This method should invalidate any external cache and then invalidate the LruCache.
return self._lru_cache.invalidate(key)

def invalidate_local(self, key: KT) -> None:
richvdh marked this conversation as resolved.
Show resolved Hide resolved
return self._lru_cache.invalidate(key)

async def contains(self, key: KT) -> bool:
return self.lru_cache.contains(key)
return self._lru_cache.contains(key)

def clear(self) -> None:
self.lru_cache.clear()
async def clear(self) -> None:
self._lru_cache.clear()
2 changes: 1 addition & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_unknown_room_version(self):

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
self.store._event_ref.clear()

# The rooms should be excluded from the sync response.
Expand Down
8 changes: 4 additions & 4 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def prepare(self, reactor, clock, hs):
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

def test_simple(self):
"""Test that we cache events that we pull from the DB."""
Expand All @@ -160,7 +160,7 @@ def test_event_ref(self):
"""

# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
Expand All @@ -170,7 +170,7 @@ def test_event_ref(self):
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
Expand Down Expand Up @@ -345,7 +345,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())

@contextmanager
def blocking_get_event_calls(
Expand Down