Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Scratch changes to fix have_seen_event not being invalidated #13861

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ federation_ip_range_blacklist: []
# Disable server rate-limiting
rc_federation:
window_size: 1000
sleep_limit: 10
# foo: We run into the rate limiter hard with the MSC2716 tests.
# We go from 35s /messages requests to 20s just by making `/state_ids` and `/state` go faster
sleep_limit: 99999
sleep_delay: 500
reject_limit: 99999
concurrent: 3
Expand Down
11 changes: 10 additions & 1 deletion synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
logger.info("CacheInvalidationWorkerStore constructor")
super().__init__(database, db_conn, hs)

self._instance_name = hs.get_instance_name()
Expand Down Expand Up @@ -222,8 +223,16 @@ def _invalidate_caches_for_event(
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
logger.info(
"CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s",
room_id,
event_id,
)
logger.info(
"CacheInvalidationWorkerStore self.have_seen_event=%s", self.have_seen_event
)
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))
self.have_seen_event.invalidate(((room_id, event_id),))
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This key lookup was wrong and we were never invalidating the have_seen_event cache even over replication.


Yes, the cache key literally a set wrapped in set. Something weird with the TreeCache I assume

ex. (('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),)

LruCache cache values before pop {(('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),): False}

We should probably check all other instances of this for the same problem. And ideally fix the cache so it uses the expected (room_id, event_id) key tuple instead.


self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ def _persist_events_txn(

self._store_event_txn(txn, events_and_contexts=events_and_contexts)

for event, _ in events_and_contexts:
# We expect events to be persisted by this point
assert event.internal_metadata.stream_ordering

relation = relation_from_event(event)
self.store._invalidate_caches_for_event(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to move because we can't touch the cache objects from this background queue thread processing and persisting the events.

Otherwise we run into ValueError: Cache objects can only be accessed from the main thread

synapse_main | 2022-09-21 20:53:42,362 - synapse.http.server - 123 - ERROR - POST-3 - Failed handle request via 'RoomCreateRestServlet': <SynapseRequest at 0x7f0fde7ec160 method='POST' uri='/_matrix/client/v3/createRoom' clientproto='HTTP/1.0' site='8080'>
synapse_main | Traceback (most recent call last):
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/http/server.py", line 306, in _async_render_wrapper
synapse_main |     callback_return = await self._async_render(request)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/http/server.py", line 512, in _async_render
synapse_main |     callback_return = await raw_callback_return
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/rest/client/room.py", line 161, in on_POST
synapse_main |     info, _ = await self._room_creation_handler.create_room(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/handlers/room.py", line 909, in create_room
synapse_main |     ) = await self._send_events_for_new_room(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/handlers/room.py", line 1105, in _send_events_for_new_room
synapse_main |     await send(etype=EventTypes.Create, content=creation_content)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/handlers/room.py", line 1081, in send
synapse_main |     ) = await self.event_creation_handler.create_and_send_nonmember_event(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/handlers/message.py", line 990, in create_and_send_nonmember_event
synapse_main |     ev = await self.handle_new_client_event(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/util/metrics.py", line 113, in measured_func
synapse_main |     r = await func(self, *args, **kwargs)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/handlers/message.py", line 1324, in handle_new_client_event
synapse_main |     result, _ = await make_deferred_yieldable(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/controllers/persist_events.py", line 287, in handle_queue_loop
synapse_main |     ret = await self._per_item_callback(room_id, item.task)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/controllers/persist_events.py", line 359, in _process_event_persist_queue_task
synapse_main |     return await self._persist_event_batch(room_id, task)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/controllers/persist_events.py", line 748, in _persist_event_batch
synapse_main |     await self.persist_events_store._persist_events_and_state_updates(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/logging/tracing.py", line 851, in _wrapper
synapse_main |     return await func(*args, **kwargs)  # type: ignore[misc]
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/databases/main/events.py", line 214, in _persist_events_and_state_updates
synapse_main |     await self.db_pool.runInteraction(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/database.py", line 881, in runInteraction
synapse_main |     return await delay_cancellation(_runInteraction())
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/internet/defer.py", line 1692, in _inlineCallbacks
synapse_main |     result = context.run(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/python/failure.py", line 518, in throwExceptionIntoGenerator
synapse_main |     return g.throw(self.type, self.value, self.tb)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/database.py", line 848, in _runInteraction
synapse_main |     result = await self.runWithConnection(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/database.py", line 972, in runWithConnection
synapse_main |     return await make_deferred_yieldable(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/python/threadpool.py", line 244, in inContext
synapse_main |     result = inContext.theWork()  # type: ignore[attr-defined]
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/python/threadpool.py", line 260, in <lambda>
synapse_main |     inContext.theWork = lambda: context.call(  # type: ignore[attr-defined]
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/python/context.py", line 117, in callWithContext
synapse_main |     return self.currentContext().callWithContext(ctx, func, *args, **kw)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/python/context.py", line 82, in callWithContext
synapse_main |     return func(*args, **kw)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/twisted/enterprise/adbapi.py", line 282, in _runWithConnection
synapse_main |     result = func(conn, *args, **kw)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/database.py", line 965, in inner_func
synapse_main |     return func(db_conn, *args, **kwargs)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/database.py", line 710, in new_transaction
synapse_main |     r = func(cursor, *args, **kwargs)
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/databases/main/events.py", line 442, in _persist_events_txn
synapse_main |     self.store._invalidate_caches_for_event(
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/storage/databases/main/cache.py", line 237, in _invalidate_caches_for_event
synapse_main |     self.get_latest_event_ids_in_room.invalidate((room_id,))
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/util/caches/descriptors.py", line 370, in <lambda>
synapse_main |     wrapped.invalidate = lambda key: cache.invalidate(key[0])
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/util/caches/deferred_cache.py", line 391, in invalidate
synapse_main |     self.check_thread()
synapse_main |   File "/usr/local/lib/python3.9/site-packages/synapse/util/caches/deferred_cache.py", line 136, in check_thread
synapse_main |     raise ValueError(
synapse_main | ValueError: Cache objects can only be accessed from the main thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move the cache invalidation to the persist_events and persist_event layer. Is that problematic/flakey if something fails in the middle? 🤔

persist_events or persist_event or update_current_state (not really)
self._event_persist_queue
_process_event_persist_queue_task
_persist_event_batch
_persist_events_and_state_updates
_persist_events_txn

stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=None, # event.state_key,
# TODO
redacts=None,
relates_to=relation.parent_id if relation else None,
# TODO
backfilled=False,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

self._persist_transaction_ids_txn(txn, events_and_contexts)

# Insert into event_to_state_groups.
Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]:
@trace
@tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
self, room_id: str, event_ids: Collection[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.

Expand All @@ -1468,6 +1468,7 @@ async def have_seen_events(
Returns:
The set of events we have already seen.
"""
logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids)

# @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
Expand All @@ -1491,6 +1492,7 @@ async def _have_seen_events_dict(
Returns:
a dict {(room_id, event_id)-> bool}
"""
logger.info("_have_seen_events_dict keys=%s", keys)
# if the event cache contains the event, obviously we've seen it.

cache_results = {
Expand Down
6 changes: 6 additions & 0 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,14 @@ def invalidate(self, key: KT) -> None:
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
import logging

logger = logging.getLogger(__name__)
logger.info("DeferredCache before=%s", self.cache.len())
logger.info("DeferredCache invalidate key=%s", key)
self.check_thread()
self.cache.del_multi(key)
logger.info("DeferredCache after=%s", self.cache.len())

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned for
Expand Down
6 changes: 6 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def add_node(
callbacks,
prune_unread_entries,
)
logger.info("LruCache add_node key=%s value=%s", key, value)
cache[key] = node

if size_callback:
Expand Down Expand Up @@ -722,7 +723,12 @@ def cache_del_multi(key: KT) -> None:
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
logger.info(
"LruCache cache values before pop %s",
{node.key: node.value for node in cache.values()},
)
popped = cache.pop(key, None)
logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
Expand Down
3 changes: 3 additions & 0 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def __init__(
self.reject_limit = config.reject_limit
self.concurrent_requests = config.concurrent

logger.info("self.sleep_limit=%s", self.sleep_limit)
logger.info("self.reject_limit=%s", self.reject_limit)

# request_id objects for requests which have been slept
self.sleeping_requests: Set[object] = set()

Expand Down
120 changes: 72 additions & 48 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,66 +35,45 @@
from synapse.util.async_helpers import yieldable_gather_results

from tests import unittest
from tests.test_utils.event_injection import create_event, inject_event


class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main

# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.token)

self.event_ids: List[str] = []
for idx, rid in enumerate(
(
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": event_id,
"room_id": rid,
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
for i in range(3):
event = self.get_success(
inject_event(
hs,
room_version=RoomVersions.V7.identifier,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": f"foobarbaz{i}"},
)
)
self.event_ids.append(event_id)

self.event_ids.append(event.event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})

Expand All @@ -104,7 +83,9 @@ def test_simple(self):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
Expand All @@ -116,11 +97,54 @@ def test_query_via_event_cache(self):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_persisting_event_invalidates_cache(self):
event, event_context = self.get_success(
create_event(
self.hs,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": "garply"},
)
)

with LoggingContext(name="test") as ctx:
# First, check `have_seen_event` for an event we have not seen yet
# to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# Persist the event which should invalidate or prefill the
# `have_seen_event` cache so we don't return stale values.
persistence = self.hs.get_storage_controllers().persistence
self.get_success(
persistence.persist_event(
event,
event_context,
)
)

with LoggingContext(name="test") as ctx:
# Check `have_seen_event` again and we should see the updated fact
# that we have now seen the event after persisting it.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, {event.event_id})

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)


class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
Expand Down
3 changes: 2 additions & 1 deletion tests/test_utils/event_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ async def create_event(
KNOWN_ROOM_VERSIONS[room_version], kwargs
)
event, context = await hs.get_event_creation_handler().create_new_client_event(
builder, prev_event_ids=prev_event_ids
builder,
prev_event_ids=prev_event_ids,
)

return event, context