Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9969359

Browse files
committed
Move background update handling out of store
1 parent d4415eb commit 9969359

27 files changed

+280
-198
lines changed

synapse/app/homeserver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ def start():
436436
_base.start(hs, config.listeners)
437437

438438
hs.get_pusherpool().start()
439-
hs.get_datastore().start_doing_background_updates()
439+
hs.get_datastore().db.updates.start_doing_background_updates()
440440
except Exception:
441441
# Print the exception and bail out.
442442
print("Error during startup:", file=sys.stderr)

synapse/rest/media/v1/preview_url_resource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def _expire_url_cache_data(self):
402402

403403
logger.info("Running url preview cache expiry")
404404

405-
if not (yield self.store.has_completed_background_updates()):
405+
if not (yield self.store.db.updates.has_completed_background_updates()):
406406
logger.info("Still running DB updates; skipping expiry")
407407
return
408408

synapse/storage/background_updates.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from synapse.metrics.background_process_metrics import run_as_background_process
2323

2424
from . import engines
25-
from ._base import SQLBaseStore
2625

2726
logger = logging.getLogger(__name__)
2827

@@ -74,7 +73,7 @@ def total_items_per_ms(self):
7473
return float(self.total_item_count) / float(self.total_duration_ms)
7574

7675

77-
class BackgroundUpdateStore(SQLBaseStore):
76+
class BackgroundUpdater(object):
7877
""" Background updates are updates to the database that run in the
7978
background. Each update processes a batch of data at once. We attempt to
8079
limit the impact of each update by monitoring how long each batch takes to
@@ -86,8 +85,10 @@ class BackgroundUpdateStore(SQLBaseStore):
8685
BACKGROUND_UPDATE_INTERVAL_MS = 1000
8786
BACKGROUND_UPDATE_DURATION_MS = 100
8887

89-
def __init__(self, db_conn, hs):
90-
super(BackgroundUpdateStore, self).__init__(db_conn, hs)
88+
def __init__(self, hs, database):
89+
self._clock = hs.get_clock()
90+
self.db = database
91+
9192
self._background_update_performance = {}
9293
self._background_update_queue = []
9394
self._background_update_handlers = {}
@@ -101,9 +102,7 @@ def run_background_updates(self, sleep=True):
101102
logger.info("Starting background schema updates")
102103
while True:
103104
if sleep:
104-
yield self.hs.get_clock().sleep(
105-
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0
106-
)
105+
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
107106

108107
try:
109108
result = yield self.do_next_background_update(
@@ -380,7 +379,7 @@ def create_index_sqlite(conn):
380379
logger.debug("[SQL] %s", sql)
381380
c.execute(sql)
382381

383-
if isinstance(self.database_engine, engines.PostgresEngine):
382+
if isinstance(self.db.database_engine, engines.PostgresEngine):
384383
runner = create_index_psql
385384
elif psql_only:
386385
runner = None

synapse/storage/data_stores/main/client_ips.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from twisted.internet import defer
2121

2222
from synapse.metrics.background_process_metrics import wrap_as_background_process
23-
from synapse.storage import background_updates
23+
from synapse.storage._base import SQLBaseStore
2424
from synapse.util.caches import CACHE_SIZE_FACTOR
2525
from synapse.util.caches.descriptors import Cache
2626

@@ -32,41 +32,41 @@
3232
LAST_SEEN_GRANULARITY = 120 * 1000
3333

3434

35-
class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
35+
class ClientIpBackgroundUpdateStore(SQLBaseStore):
3636
def __init__(self, db_conn, hs):
3737
super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)
3838

39-
self.register_background_index_update(
39+
self.db.updates.register_background_index_update(
4040
"user_ips_device_index",
4141
index_name="user_ips_device_id",
4242
table="user_ips",
4343
columns=["user_id", "device_id", "last_seen"],
4444
)
4545

46-
self.register_background_index_update(
46+
self.db.updates.register_background_index_update(
4747
"user_ips_last_seen_index",
4848
index_name="user_ips_last_seen",
4949
table="user_ips",
5050
columns=["user_id", "last_seen"],
5151
)
5252

53-
self.register_background_index_update(
53+
self.db.updates.register_background_index_update(
5454
"user_ips_last_seen_only_index",
5555
index_name="user_ips_last_seen_only",
5656
table="user_ips",
5757
columns=["last_seen"],
5858
)
5959

60-
self.register_background_update_handler(
60+
self.db.updates.register_background_update_handler(
6161
"user_ips_analyze", self._analyze_user_ip
6262
)
6363

64-
self.register_background_update_handler(
64+
self.db.updates.register_background_update_handler(
6565
"user_ips_remove_dupes", self._remove_user_ip_dupes
6666
)
6767

6868
# Register a unique index
69-
self.register_background_index_update(
69+
self.db.updates.register_background_index_update(
7070
"user_ips_device_unique_index",
7171
index_name="user_ips_user_token_ip_unique_index",
7272
table="user_ips",
@@ -75,12 +75,12 @@ def __init__(self, db_conn, hs):
7575
)
7676

7777
# Drop the old non-unique index
78-
self.register_background_update_handler(
78+
self.db.updates.register_background_update_handler(
7979
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
8080
)
8181

8282
# Update the last seen info in devices.
83-
self.register_background_update_handler(
83+
self.db.updates.register_background_update_handler(
8484
"devices_last_seen", self._devices_last_seen_update
8585
)
8686

@@ -92,7 +92,7 @@ def f(conn):
9292
txn.close()
9393

9494
yield self.db.runWithConnection(f)
95-
yield self._end_background_update("user_ips_drop_nonunique_index")
95+
yield self.db.updates._end_background_update("user_ips_drop_nonunique_index")
9696
return 1
9797

9898
@defer.inlineCallbacks
@@ -108,7 +108,7 @@ def user_ips_analyze(txn):
108108

109109
yield self.db.runInteraction("user_ips_analyze", user_ips_analyze)
110110

111-
yield self._end_background_update("user_ips_analyze")
111+
yield self.db.updates._end_background_update("user_ips_analyze")
112112

113113
return 1
114114

@@ -271,14 +271,14 @@ def remove(txn):
271271
(user_id, access_token, ip, device_id, user_agent, last_seen),
272272
)
273273

274-
self._background_update_progress_txn(
274+
self.db.updates._background_update_progress_txn(
275275
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
276276
)
277277

278278
yield self.db.runInteraction("user_ips_dups_remove", remove)
279279

280280
if last:
281-
yield self._end_background_update("user_ips_remove_dupes")
281+
yield self.db.updates._end_background_update("user_ips_remove_dupes")
282282

283283
return batch_size
284284

@@ -344,7 +344,7 @@ def _devices_last_seen_update_txn(txn):
344344
txn.execute_batch(sql, rows)
345345

346346
_, _, _, user_id, device_id = rows[-1]
347-
self._background_update_progress_txn(
347+
self.db.updates._background_update_progress_txn(
348348
txn,
349349
"devices_last_seen",
350350
{"last_user_id": user_id, "last_device_id": device_id},
@@ -357,7 +357,7 @@ def _devices_last_seen_update_txn(txn):
357357
)
358358

359359
if not updated:
360-
yield self._end_background_update("devices_last_seen")
360+
yield self.db.updates._end_background_update("devices_last_seen")
361361

362362
return updated
363363

@@ -546,7 +546,9 @@ async def _prune_old_user_ips(self):
546546
# Nothing to do
547547
return
548548

549-
if not await self.has_completed_background_update("devices_last_seen"):
549+
if not await self.db.updates.has_completed_background_update(
550+
"devices_last_seen"
551+
):
550552
# Only start pruning if we have finished populating the devices
551553
# last seen info.
552554
return

synapse/storage/data_stores/main/deviceinbox.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
from synapse.logging.opentracing import log_kv, set_tag, trace
2323
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
24-
from synapse.storage.background_updates import BackgroundUpdateStore
2524
from synapse.util.caches.expiringcache import ExpiringCache
2625

2726
logger = logging.getLogger(__name__)
@@ -208,20 +207,20 @@ def delete_messages_for_remote_destination_txn(txn):
208207
)
209208

210209

211-
class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
210+
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
212211
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
213212

214213
def __init__(self, db_conn, hs):
215214
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)
216215

217-
self.register_background_index_update(
216+
self.db.updates.register_background_index_update(
218217
"device_inbox_stream_index",
219218
index_name="device_inbox_stream_id_user_id",
220219
table="device_inbox",
221220
columns=["stream_id", "user_id"],
222221
)
223222

224-
self.register_background_update_handler(
223+
self.db.updates.register_background_update_handler(
225224
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
226225
)
227226

@@ -234,7 +233,7 @@ def reindex_txn(conn):
234233

235234
yield self.db.runWithConnection(reindex_txn)
236235

237-
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
236+
yield self.db.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
238237

239238
return 1
240239

synapse/storage/data_stores/main/devices.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
)
3232
from synapse.metrics.background_process_metrics import run_as_background_process
3333
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
34-
from synapse.storage.background_updates import BackgroundUpdateStore
3534
from synapse.types import get_verify_key_from_cross_signing_key
3635
from synapse.util import batch_iter
3736
from synapse.util.caches.descriptors import (
@@ -642,19 +641,19 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids):
642641
return results
643642

644643

645-
class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
644+
class DeviceBackgroundUpdateStore(SQLBaseStore):
646645
def __init__(self, db_conn, hs):
647646
super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)
648647

649-
self.register_background_index_update(
648+
self.db.updates.register_background_index_update(
650649
"device_lists_stream_idx",
651650
index_name="device_lists_stream_user_id",
652651
table="device_lists_stream",
653652
columns=["user_id", "device_id"],
654653
)
655654

656655
# create a unique index on device_lists_remote_cache
657-
self.register_background_index_update(
656+
self.db.updates.register_background_index_update(
658657
"device_lists_remote_cache_unique_idx",
659658
index_name="device_lists_remote_cache_unique_id",
660659
table="device_lists_remote_cache",
@@ -663,7 +662,7 @@ def __init__(self, db_conn, hs):
663662
)
664663

665664
# And one on device_lists_remote_extremeties
666-
self.register_background_index_update(
665+
self.db.updates.register_background_index_update(
667666
"device_lists_remote_extremeties_unique_idx",
668667
index_name="device_lists_remote_extremeties_unique_idx",
669668
table="device_lists_remote_extremeties",
@@ -672,7 +671,7 @@ def __init__(self, db_conn, hs):
672671
)
673672

674673
# once they complete, we can remove the old non-unique indexes.
675-
self.register_background_update_handler(
674+
self.db.updates.register_background_update_handler(
676675
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
677676
self._drop_device_list_streams_non_unique_indexes,
678677
)
@@ -686,7 +685,9 @@ def f(conn):
686685
txn.close()
687686

688687
yield self.db.runWithConnection(f)
689-
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
688+
yield self.db.updates._end_background_update(
689+
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES
690+
)
690691
return 1
691692

692693

synapse/storage/data_stores/main/event_federation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ class EventFederationStore(EventFederationWorkerStore):
494494
def __init__(self, db_conn, hs):
495495
super(EventFederationStore, self).__init__(db_conn, hs)
496496

497-
self.register_background_update_handler(
497+
self.db.updates.register_background_update_handler(
498498
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
499499
)
500500

@@ -654,7 +654,7 @@ def delete_event_auth(txn):
654654
"max_stream_id_exclusive": min_stream_id,
655655
}
656656

657-
self._background_update_progress_txn(
657+
self.db.updates._background_update_progress_txn(
658658
txn, self.EVENT_AUTH_STATE_ONLY, new_progress
659659
)
660660

@@ -665,6 +665,6 @@ def delete_event_auth(txn):
665665
)
666666

667667
if not result:
668-
yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
668+
yield self.db.updates._end_background_update(self.EVENT_AUTH_STATE_ONLY)
669669

670670
return batch_size

synapse/storage/data_stores/main/event_push_actions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,14 +614,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
614614
def __init__(self, db_conn, hs):
615615
super(EventPushActionsStore, self).__init__(db_conn, hs)
616616

617-
self.register_background_index_update(
617+
self.db.updates.register_background_index_update(
618618
self.EPA_HIGHLIGHT_INDEX,
619619
index_name="event_push_actions_u_highlight",
620620
table="event_push_actions",
621621
columns=["user_id", "stream_ordering"],
622622
)
623623

624-
self.register_background_index_update(
624+
self.db.updates.register_background_index_update(
625625
"event_push_actions_highlights_index",
626626
index_name="event_push_actions_highlights_index",
627627
table="event_push_actions",

synapse/storage/data_stores/main/events.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from synapse.metrics import BucketCollector
3939
from synapse.metrics.background_process_metrics import run_as_background_process
4040
from synapse.storage._base import make_in_list_sql_clause
41-
from synapse.storage.background_updates import BackgroundUpdateStore
4241
from synapse.storage.data_stores.main.event_federation import EventFederationStore
4342
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
4443
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
@@ -94,10 +93,7 @@ def f(self, *args, **kwargs):
9493
# inherits from EventFederationStore so that we can call _update_backward_extremities
9594
# and _handle_mult_prev_events (though arguably those could both be moved in here)
9695
class EventsStore(
97-
StateGroupWorkerStore,
98-
EventFederationStore,
99-
EventsWorkerStore,
100-
BackgroundUpdateStore,
96+
StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
10197
):
10298
def __init__(self, db_conn, hs):
10399
super(EventsStore, self).__init__(db_conn, hs)

0 commit comments

Comments
 (0)