Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 663970c

Browse files
committed
Merge pull request #6454 from matrix-org/erikj/clean_base_Store
* commit '3eb15c01d': Revert "Move get_user_count_txn out of base store" _CURRENT_STATE_CACHE_NAME is public Move get_user_count_txn out of base store Newsfile Move cache invalidation to main data store Move event fetch vars to EventWorkStore Move account validity bg updates to registration store
2 parents fe1d237 + 3eb15c0 commit 663970c

File tree

9 files changed

+222
-192
lines changed

9 files changed

+222
-192
lines changed

changelog.d/6454.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Move data store specific code out of `SQLBaseStore`.

synapse/replication/slave/storage/_base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import six
2020

21-
from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
21+
from synapse.storage._base import SQLBaseStore
22+
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
2223
from synapse.storage.engines import PostgresEngine
2324

2425
from ._slaved_id_tracker import SlavedIdTracker
@@ -62,7 +63,7 @@ def process_replication_rows(self, stream_name, token, rows):
6263
if stream_name == "caches":
6364
self._cache_id_gen.advance(token)
6465
for row in rows:
65-
if row.cache_func == _CURRENT_STATE_CACHE_NAME:
66+
if row.cache_func == CURRENT_STATE_CACHE_NAME:
6667
room_id = row.keys[0]
6768
members_changed = set(row.keys[1:])
6869
self._invalidate_state_caches(room_id, members_changed)

synapse/storage/_base.py

Lines changed: 0 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
17-
import itertools
1817
import logging
1918
import random
2019
import sys
21-
import threading
2220
import time
2321
from typing import Iterable, Tuple
2422

@@ -35,8 +33,6 @@
3533
from synapse.metrics.background_process_metrics import run_as_background_process
3634
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
3735
from synapse.types import get_domain_from_id
38-
from synapse.util import batch_iter
39-
from synapse.util.caches.descriptors import Cache
4036
from synapse.util.stringutils import exception_to_unicode
4137

4238
# import a function which will return a monotonic time, in seconds
@@ -79,10 +75,6 @@
7975
"event_search": "event_search_event_id_idx",
8076
}
8177

82-
# This is a special cache name we use to batch multiple invalidations of caches
83-
# based on the current state when notifying workers over replication.
84-
_CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
85-
8678

8779
class LoggingTransaction(object):
8880
"""An object that almost-transparently proxies for the 'txn' object
@@ -237,23 +229,11 @@ def __init__(self, db_conn, hs):
237229
# to watch it
238230
self._txn_perf_counters = PerformanceCounters()
239231

240-
self._get_event_cache = Cache(
241-
"*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
242-
)
243-
244-
self._event_fetch_lock = threading.Condition()
245-
self._event_fetch_list = []
246-
self._event_fetch_ongoing = 0
247-
248-
self._pending_ds = []
249-
250232
self.database_engine = hs.database_engine
251233

252234
# A set of tables that are not safe to use native upserts in.
253235
self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
254236

255-
self._account_validity = self.hs.config.account_validity
256-
257237
# We add the user_directory_search table to the blacklist on SQLite
258238
# because the existing search table does not have an index, making it
259239
# unsafe to use native upserts.
@@ -272,14 +252,6 @@ def __init__(self, db_conn, hs):
272252

273253
self.rand = random.SystemRandom()
274254

275-
if self._account_validity.enabled:
276-
self._clock.call_later(
277-
0.0,
278-
run_as_background_process,
279-
"account_validity_set_expiration_dates",
280-
self._set_expiration_date_when_missing,
281-
)
282-
283255
@defer.inlineCallbacks
284256
def _check_safe_to_upsert(self):
285257
"""
@@ -312,62 +284,6 @@ def _check_safe_to_upsert(self):
312284
self._check_safe_to_upsert,
313285
)
314286

315-
@defer.inlineCallbacks
316-
def _set_expiration_date_when_missing(self):
317-
"""
318-
Retrieves the list of registered users that don't have an expiration date, and
319-
adds an expiration date for each of them.
320-
"""
321-
322-
def select_users_with_no_expiration_date_txn(txn):
323-
"""Retrieves the list of registered users with no expiration date from the
324-
database, filtering out deactivated users.
325-
"""
326-
sql = (
327-
"SELECT users.name FROM users"
328-
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
329-
" WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
330-
)
331-
txn.execute(sql, [])
332-
333-
res = self.cursor_to_dict(txn)
334-
if res:
335-
for user in res:
336-
self.set_expiration_date_for_user_txn(
337-
txn, user["name"], use_delta=True
338-
)
339-
340-
yield self.runInteraction(
341-
"get_users_with_no_expiration_date",
342-
select_users_with_no_expiration_date_txn,
343-
)
344-
345-
def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
346-
"""Sets an expiration date to the account with the given user ID.
347-
348-
Args:
349-
user_id (str): User ID to set an expiration date for.
350-
use_delta (bool): If set to False, the expiration date for the user will be
351-
now + validity period. If set to True, this expiration date will be a
352-
random value in the [now + period - d ; now + period] range, d being a
353-
delta equal to 10% of the validity period.
354-
"""
355-
now_ms = self._clock.time_msec()
356-
expiration_ts = now_ms + self._account_validity.period
357-
358-
if use_delta:
359-
expiration_ts = self.rand.randrange(
360-
expiration_ts - self._account_validity.startup_job_max_delta,
361-
expiration_ts,
362-
)
363-
364-
self._simple_upsert_txn(
365-
txn,
366-
"account_validity",
367-
keyvalues={"user_id": user_id},
368-
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
369-
)
370-
371287
def start_profiling(self):
372288
self._previous_loop_ts = monotonic_time()
373289

@@ -1400,47 +1316,6 @@ def _get_cache_dict(
14001316

14011317
return cache, min_val
14021318

1403-
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
1404-
"""Invalidates the cache and adds it to the cache stream so slaves
1405-
will know to invalidate their caches.
1406-
1407-
This should only be used to invalidate caches where slaves won't
1408-
otherwise know from other replication streams that the cache should
1409-
be invalidated.
1410-
"""
1411-
txn.call_after(cache_func.invalidate, keys)
1412-
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
1413-
1414-
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
1415-
"""Special case invalidation of caches based on current state.
1416-
1417-
We special case this so that we can batch the cache invalidations into a
1418-
single replication poke.
1419-
1420-
Args:
1421-
txn
1422-
room_id (str): Room where state changed
1423-
members_changed (iterable[str]): The user_ids of members that have changed
1424-
"""
1425-
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
1426-
1427-
if members_changed:
1428-
# We need to be careful that the size of the `members_changed` list
1429-
# isn't so large that it causes problems sending over replication, so we
1430-
# send them in chunks.
1431-
# Max line length is 16K, and max user ID length is 255, so 50 should
1432-
# be safe.
1433-
for chunk in batch_iter(members_changed, 50):
1434-
keys = itertools.chain([room_id], chunk)
1435-
self._send_invalidation_to_replication(
1436-
txn, _CURRENT_STATE_CACHE_NAME, keys
1437-
)
1438-
else:
1439-
# if no members changed, we still need to invalidate the other caches.
1440-
self._send_invalidation_to_replication(
1441-
txn, _CURRENT_STATE_CACHE_NAME, [room_id]
1442-
)
1443-
14441319
def _invalidate_state_caches(self, room_id, members_changed):
14451320
"""Invalidates caches that are based on the current state, but does
14461321
not stream invalidations down replication.
@@ -1474,63 +1349,6 @@ def _attempt_to_invalidate_cache(self, cache_name, key):
14741349
# which is fine.
14751350
pass
14761351

1477-
def _send_invalidation_to_replication(self, txn, cache_name, keys):
1478-
"""Notifies replication that given cache has been invalidated.
1479-
1480-
Note that this does *not* invalidate the cache locally.
1481-
1482-
Args:
1483-
txn
1484-
cache_name (str)
1485-
keys (iterable[str])
1486-
"""
1487-
1488-
if isinstance(self.database_engine, PostgresEngine):
1489-
# get_next() returns a context manager which is designed to wrap
1490-
# the transaction. However, we want to only get an ID when we want
1491-
# to use it, here, so we need to call __enter__ manually, and have
1492-
# __exit__ called after the transaction finishes.
1493-
ctx = self._cache_id_gen.get_next()
1494-
stream_id = ctx.__enter__()
1495-
txn.call_on_exception(ctx.__exit__, None, None, None)
1496-
txn.call_after(ctx.__exit__, None, None, None)
1497-
txn.call_after(self.hs.get_notifier().on_new_replication_data)
1498-
1499-
self._simple_insert_txn(
1500-
txn,
1501-
table="cache_invalidation_stream",
1502-
values={
1503-
"stream_id": stream_id,
1504-
"cache_func": cache_name,
1505-
"keys": list(keys),
1506-
"invalidation_ts": self.clock.time_msec(),
1507-
},
1508-
)
1509-
1510-
def get_all_updated_caches(self, last_id, current_id, limit):
1511-
if last_id == current_id:
1512-
return defer.succeed([])
1513-
1514-
def get_all_updated_caches_txn(txn):
1515-
# We purposefully don't bound by the current token, as we want to
1516-
# send across cache invalidations as quickly as possible. Cache
1517-
# invalidations are idempotent, so duplicates are fine.
1518-
sql = (
1519-
"SELECT stream_id, cache_func, keys, invalidation_ts"
1520-
" FROM cache_invalidation_stream"
1521-
" WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
1522-
)
1523-
txn.execute(sql, (last_id, limit))
1524-
return txn.fetchall()
1525-
1526-
return self.runInteraction("get_all_updated_caches", get_all_updated_caches_txn)
1527-
1528-
def get_cache_stream_token(self):
1529-
if self._cache_id_gen:
1530-
return self._cache_id_gen.get_current_token()
1531-
else:
1532-
return 0
1533-
15341352
def _simple_select_list_paginate(
15351353
self,
15361354
table,

synapse/storage/data_stores/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from .account_data import AccountDataStore
3434
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
35+
from .cache import CacheInvalidationStore
3536
from .client_ips import ClientIpStore
3637
from .deviceinbox import DeviceInboxStore
3738
from .devices import DeviceStore
@@ -110,6 +111,7 @@ class DataStore(
110111
MonthlyActiveUsersStore,
111112
StatsStore,
112113
RelationsStore,
114+
CacheInvalidationStore,
113115
):
114116
def __init__(self, db_conn, hs):
115117
self.hs = hs

0 commit comments

Comments
 (0)