Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 115f0eb

Browse files
David RobertsonFizzadarerikjohnston
authored
Reintroduce #14376, with bugfix for monoliths (#14468)
* Add tests for StreamIdGenerator * Drive-by: annotate all defs * Revert "Revert "Remove slaved id tracker (#14376)" (#14463)" This reverts commit d63814f, which in turn reverted 36097e8. This restores the latter. * Fix StreamIdGenerator not handling unpersisted IDs Spotted by @erikjohnston. Closes #14456. * Changelog Co-authored-by: Nick Mills-Barrett <nick@fizzadar.com> Co-authored-by: Erik Johnston <erik@matrix.org>
1 parent c15e9a0 commit 115f0eb

File tree

14 files changed

+230
-186
lines changed

14 files changed

+230
-186
lines changed

changelog.d/14376.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).

changelog.d/14468.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ disallow_untyped_defs = True
117117
[mypy-tests.state.test_profile]
118118
disallow_untyped_defs = True
119119

120+
[mypy-tests.storage.test_id_generators]
121+
disallow_untyped_defs = True
122+
120123
[mypy-tests.storage.test_profile]
121124
disallow_untyped_defs = True
122125

synapse/replication/slave/__init__.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

synapse/replication/slave/storage/__init__.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

synapse/replication/slave/storage/_slaved_id_tracker.py

Lines changed: 0 additions & 50 deletions
This file was deleted.

synapse/storage/databases/main/account_data.py

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
)
2828

2929
from synapse.api.constants import AccountDataTypes
30-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3130
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
3231
from synapse.storage._base import db_to_json
3332
from synapse.storage.database import (
@@ -68,12 +67,11 @@ def __init__(
6867
# to write account data. A value of `True` implies that `_account_data_id_gen`
6968
# is an `AbstractStreamIdGenerator` and not just a tracker.
7069
self._account_data_id_gen: AbstractStreamIdTracker
70+
self._can_write_to_account_data = (
71+
self._instance_name in hs.config.worker.writers.account_data
72+
)
7173

7274
if isinstance(database.engine, PostgresEngine):
73-
self._can_write_to_account_data = (
74-
self._instance_name in hs.config.worker.writers.account_data
75-
)
76-
7775
self._account_data_id_gen = MultiWriterIdGenerator(
7876
db_conn=db_conn,
7977
db=database,
@@ -95,21 +93,13 @@ def __init__(
9593
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
9694
# updated over replication. (Multiple writers are not supported for
9795
# SQLite).
98-
if self._instance_name in hs.config.worker.writers.account_data:
99-
self._can_write_to_account_data = True
100-
self._account_data_id_gen = StreamIdGenerator(
101-
db_conn,
102-
"room_account_data",
103-
"stream_id",
104-
extra_tables=[("room_tags_revisions", "stream_id")],
105-
)
106-
else:
107-
self._account_data_id_gen = SlavedIdTracker(
108-
db_conn,
109-
"room_account_data",
110-
"stream_id",
111-
extra_tables=[("room_tags_revisions", "stream_id")],
112-
)
96+
self._account_data_id_gen = StreamIdGenerator(
97+
db_conn,
98+
"room_account_data",
99+
"stream_id",
100+
extra_tables=[("room_tags_revisions", "stream_id")],
101+
is_writer=self._instance_name in hs.config.worker.writers.account_data,
102+
)
113103

114104
account_max = self.get_max_account_data_stream_id()
115105
self._account_data_stream_cache = StreamChangeCache(

synapse/storage/databases/main/devices.py

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
whitelisted_homeserver,
3939
)
4040
from synapse.metrics.background_process_metrics import wrap_as_background_process
41-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
4241
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
4342
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
4443
from synapse.storage.database import (
@@ -86,28 +85,19 @@ def __init__(
8685
):
8786
super().__init__(database, db_conn, hs)
8887

89-
if hs.config.worker.worker_app is None:
90-
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91-
db_conn,
92-
"device_lists_stream",
93-
"stream_id",
94-
extra_tables=[
95-
("user_signature_stream", "stream_id"),
96-
("device_lists_outbound_pokes", "stream_id"),
97-
("device_lists_changes_in_room", "stream_id"),
98-
],
99-
)
100-
else:
101-
self._device_list_id_gen = SlavedIdTracker(
102-
db_conn,
103-
"device_lists_stream",
104-
"stream_id",
105-
extra_tables=[
106-
("user_signature_stream", "stream_id"),
107-
("device_lists_outbound_pokes", "stream_id"),
108-
("device_lists_changes_in_room", "stream_id"),
109-
],
110-
)
88+
# In the worker store this is an ID tracker which we overwrite in the non-worker
89+
# class below that is used on the main process.
90+
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
91+
db_conn,
92+
"device_lists_stream",
93+
"stream_id",
94+
extra_tables=[
95+
("user_signature_stream", "stream_id"),
96+
("device_lists_outbound_pokes", "stream_id"),
97+
("device_lists_changes_in_room", "stream_id"),
98+
],
99+
is_writer=hs.config.worker.worker_app is None,
100+
)
111101

112102
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
113103
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).

synapse/storage/databases/main/events_worker.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
run_as_background_process,
6060
wrap_as_background_process,
6161
)
62-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
6362
from synapse.replication.tcp.streams import BackfillStream
6463
from synapse.replication.tcp.streams.events import EventsStream
6564
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -213,26 +212,20 @@ def __init__(
213212
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
214213
# updated over replication. (Multiple writers are not supported for
215214
# SQLite).
216-
if hs.get_instance_name() in hs.config.worker.writers.events:
217-
self._stream_id_gen = StreamIdGenerator(
218-
db_conn,
219-
"events",
220-
"stream_ordering",
221-
)
222-
self._backfill_id_gen = StreamIdGenerator(
223-
db_conn,
224-
"events",
225-
"stream_ordering",
226-
step=-1,
227-
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
228-
)
229-
else:
230-
self._stream_id_gen = SlavedIdTracker(
231-
db_conn, "events", "stream_ordering"
232-
)
233-
self._backfill_id_gen = SlavedIdTracker(
234-
db_conn, "events", "stream_ordering", step=-1
235-
)
215+
self._stream_id_gen = StreamIdGenerator(
216+
db_conn,
217+
"events",
218+
"stream_ordering",
219+
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
220+
)
221+
self._backfill_id_gen = StreamIdGenerator(
222+
db_conn,
223+
"events",
224+
"stream_ordering",
225+
step=-1,
226+
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
227+
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
228+
)
236229

237230
events_max = self._stream_id_gen.get_current_token()
238231
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(

synapse/storage/databases/main/push_rule.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
from synapse.api.errors import StoreError
3232
from synapse.config.homeserver import ExperimentalConfig
33-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3433
from synapse.replication.tcp.streams import PushRulesStream
3534
from synapse.storage._base import SQLBaseStore
3635
from synapse.storage.database import (
@@ -111,14 +110,14 @@ def __init__(
111110
):
112111
super().__init__(database, db_conn, hs)
113112

114-
if hs.config.worker.worker_app is None:
115-
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116-
db_conn, "push_rules_stream", "stream_id"
117-
)
118-
else:
119-
self._push_rules_stream_id_gen = SlavedIdTracker(
120-
db_conn, "push_rules_stream", "stream_id"
121-
)
113+
# In the worker store this is an ID tracker which we overwrite in the non-worker
114+
# class below that is used on the main process.
115+
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
116+
db_conn,
117+
"push_rules_stream",
118+
"stream_id",
119+
is_writer=hs.config.worker.worker_app is None,
120+
)
122121

123122
push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
124123
db_conn,

0 commit comments

Comments
 (0)