Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Feb 19, 2019
1 parent 92e6fb5 commit bc8fa15
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
21 changes: 20 additions & 1 deletion docs/tcp_replication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details.


Example
~~~~~~~

Expand Down Expand Up @@ -221,3 +220,23 @@ SYNC (S, C)

See ``synapse/replication/tcp/commands.py`` for a detailed description and the
format of each command.


Cache Invalidation Stream
~~~~~~~~~~~~~~~~~~~~~~~~~

The cache invalidation stream is used to inform workers when they need to
invalidate any of their caches in the data store. This is done by streaming all
cache invalidations done on master down to the workers, assuming that any caches
on the workers also exist on the master.

Each individual cache invalidation results in a row being sent down replication,
which includes the cache name (the name of the function) and they key to
invalidate. For example::

> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]

However, there are times when a number of caches need to be invalidated at the
same time with the same key. To reduce traffic we batch those invalidations into
a single poke by defining a special cache name that workers understand to mean
to expand to invalidate the correct caches.
8 changes: 4 additions & 4 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,8 +1199,8 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
Args:
txn
room_id (str): Room were state changed
members_changed (set[str]): The user_ids of members that have changed
room_id (str): Room where state changed
members_changed (Iterable[str]): The user_ids of members that have changed
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)

Expand All @@ -1215,7 +1215,7 @@ def _invalidate_state_caches(self, room_id, members_changed):
not stream invalidations down replication.
Args:
room_id (str): Room were state changed
room_id (str): Room where state changed
members_changed (set[str]): The user_ids of members that have changed
"""
for member in members_changed:
Expand All @@ -1237,7 +1237,7 @@ def _send_invalidation_to_replication(self, txn, cache_name, keys):
Args:
txn
cache_name (str)
keys (list[str])
keys (iterable[str])
"""

if isinstance(self.database_engine, PostgresEngine):
Expand Down

0 comments on commit bc8fa15

Please sign in to comment.