From bc8fa1509d3885d111a2ef98e12e9ce66a19a3a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Feb 2019 11:24:59 +0000 Subject: [PATCH] Documentation --- docs/tcp_replication.rst | 21 ++++++++++++++++++++- synapse/storage/_base.py | 8 ++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 62225ba6f4ca..852f1113a360 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -137,7 +137,6 @@ for each stream so that on reconneciton it can start streaming from the correct place. Note: not all RDATA have valid tokens due to batching. See ``RdataCommand`` for more details. - Example ~~~~~~~ @@ -221,3 +220,23 @@ SYNC (S, C) See ``synapse/replication/tcp/commands.py`` for a detailed description and the format of each command. + + +Cache Invalidation Stream +~~~~~~~~~~~~~~~~~~~~~~~~~ + +The cache invalidation stream is used to inform workers when they need to +invalidate any of their caches in the data store. This is done by streaming all +cache invalidations done on master down to the workers, assuming that any caches +on the workers also exist on the master. + +Each individual cache invalidation results in a row being sent down replication, +which includes the cache name (the name of the function) and they key to +invalidate. For example:: + + > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251] + +However, there are times when a number of caches need to be invalidated at the +same time with the same key. To reduce traffic we batch those invalidations into +a single poke by defining a special cache name that workers understand to mean +to expand to invalidate the correct caches. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f7c6d714aba3..1c8d3f002654 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1199,8 +1199,8 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): Args: txn - room_id (str): Room were state changed - members_changed (set[str]): The user_ids of members that have changed + room_id (str): Room where state changed + members_changed (Iterable[str]): The user_ids of members that have changed """ txn.call_after(self._invalidate_state_caches, room_id, members_changed) @@ -1215,7 +1215,7 @@ def _invalidate_state_caches(self, room_id, members_changed): not stream invalidations down replication. Args: - room_id (str): Room were state changed + room_id (str): Room where state changed members_changed (set[str]): The user_ids of members that have changed """ for member in members_changed: @@ -1237,7 +1237,7 @@ def _send_invalidation_to_replication(self, txn, cache_name, keys): Args: txn cache_name (str) - keys (list[str]) + keys (iterable[str]) """ if isinstance(self.database_engine, PostgresEngine):