Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit b530eaa

Browse files
authored
Allow running sendToDevice on workers (#9044)
1 parent 5e99a94 commit b530eaa

File tree

11 files changed

+231
-105
lines changed

11 files changed

+231
-105
lines changed

changelog.d/9044.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental support for handling and persistence of to-device messages to happen on worker processes.

scripts/synapse_port_db

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,7 @@ class Porter(object):
629629
await self._setup_state_group_id_seq()
630630
await self._setup_user_id_seq()
631631
await self._setup_events_stream_seqs()
632+
await self._setup_device_inbox_seq()
632633

633634
# Step 3. Get tables.
634635
self.progress.set_state("Fetching tables")
@@ -911,6 +912,32 @@ class Porter(object):
911912
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
912913
)
913914

915+
async def _setup_device_inbox_seq(self):
916+
"""Set the device inbox sequence to the correct value.
917+
"""
918+
curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
919+
table="device_inbox",
920+
keyvalues={},
921+
retcol="COALESCE(MAX(stream_id), 1)",
922+
allow_none=True,
923+
)
924+
925+
curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
926+
table="device_federation_outbox",
927+
keyvalues={},
928+
retcol="COALESCE(MAX(stream_id), 1)",
929+
allow_none=True,
930+
)
931+
932+
next_id = max(curr_local_id, curr_federation_id) + 1
933+
934+
def r(txn):
935+
txn.execute(
936+
"ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
937+
)
938+
939+
return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
940+
914941

915942
##############################################
916943
# The following is simply UI stuff

synapse/app/generic_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
)
109109
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
110110
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
111+
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
111112
from synapse.rest.client.versions import VersionsRestServlet
112113
from synapse.rest.health import HealthResource
113114
from synapse.rest.key.v2 import KeyApiV2Resource
@@ -520,6 +521,8 @@ def _listen_http(self, listener_config: ListenerConfig):
520521
room.register_deprecated_servlets(self, resource)
521522
InitialSyncRestServlet(self).register(resource)
522523

524+
SendToDeviceRestServlet(self).register(resource)
525+
523526
user_directory.register_servlets(self, resource)
524527

525528
# If presence is disabled, use the stub servlet that does

synapse/config/workers.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class WriterLocations:
5353
default=["master"], type=List[str], converter=_instance_to_list_converter
5454
)
5555
typing = attr.ib(default="master", type=str)
56+
to_device = attr.ib(
57+
default=["master"], type=List[str], converter=_instance_to_list_converter,
58+
)
5659

5760

5861
class WorkerConfig(Config):
@@ -124,7 +127,7 @@ def read_config(self, config, **kwargs):
124127

125128
# Check that the configured writers for events and typing also appears in
126129
# `instance_map`.
127-
for stream in ("events", "typing"):
130+
for stream in ("events", "typing", "to_device"):
128131
instances = _instance_to_list_converter(getattr(self.writers, stream))
129132
for instance in instances:
130133
if instance != "master" and instance not in self.instance_map:
@@ -133,6 +136,11 @@ def read_config(self, config, **kwargs):
133136
% (instance, stream)
134137
)
135138

139+
if len(self.writers.to_device) != 1:
140+
raise ConfigError(
141+
"Must only specify one instance to handle `to_device` messages."
142+
)
143+
136144
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
137145

138146
# Whether this worker should run background tasks or not.

synapse/handlers/devicemessage.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,25 @@ def __init__(self, hs: "HomeServer"):
4545
self.store = hs.get_datastore()
4646
self.notifier = hs.get_notifier()
4747
self.is_mine = hs.is_mine
48-
self.federation = hs.get_federation_sender()
4948

50-
hs.get_federation_registry().register_edu_handler(
51-
"m.direct_to_device", self.on_direct_to_device_edu
52-
)
49+
# We only need to poke the federation sender explicitly if its on the
50+
# same instance. Other federation sender instances will get notified by
51+
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
52+
# in the to-device replication stream.
53+
self.federation_sender = None
54+
if hs.should_send_federation():
55+
self.federation_sender = hs.get_federation_sender()
56+
57+
# If we can handle the to device EDUs we do so, otherwise we route them
58+
# to the appropriate worker.
59+
if hs.get_instance_name() in hs.config.worker.writers.to_device:
60+
hs.get_federation_registry().register_edu_handler(
61+
"m.direct_to_device", self.on_direct_to_device_edu
62+
)
63+
else:
64+
hs.get_federation_registry().register_instances_for_edu(
65+
"m.direct_to_device", hs.config.worker.writers.to_device,
66+
)
5367

5468
# The handler to call when we think a user's device list might be out of
5569
# sync. We do all device list resyncing on the master instance, so if
@@ -204,7 +218,8 @@ async def send_device_message(
204218
)
205219

206220
log_kv({"remote_messages": remote_messages})
207-
for destination in remote_messages.keys():
208-
# Enqueue a new federation transaction to send the new
209-
# device messages to each remote destination.
210-
self.federation.send_device_messages(destination)
221+
if self.federation_sender:
222+
for destination in remote_messages.keys():
223+
# Enqueue a new federation transaction to send the new
224+
# device messages to each remote destination.
225+
self.federation_sender.send_device_messages(destination)

synapse/replication/slave/storage/deviceinbox.py

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,8 @@
1414
# limitations under the License.
1515

1616
from synapse.replication.slave.storage._base import BaseSlavedStore
17-
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
18-
from synapse.replication.tcp.streams import ToDeviceStream
19-
from synapse.storage.database import DatabasePool
2017
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
21-
from synapse.util.caches.stream_change_cache import StreamChangeCache
2218

2319

2420
class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
25-
def __init__(self, database: DatabasePool, db_conn, hs):
26-
super().__init__(database, db_conn, hs)
27-
self._device_inbox_id_gen = SlavedIdTracker(
28-
db_conn, "device_inbox", "stream_id"
29-
)
30-
self._device_inbox_stream_cache = StreamChangeCache(
31-
"DeviceInboxStreamChangeCache",
32-
self._device_inbox_id_gen.get_current_token(),
33-
)
34-
self._device_federation_outbox_stream_cache = StreamChangeCache(
35-
"DeviceFederationOutboxStreamChangeCache",
36-
self._device_inbox_id_gen.get_current_token(),
37-
)
38-
39-
def process_replication_rows(self, stream_name, instance_name, token, rows):
40-
if stream_name == ToDeviceStream.NAME:
41-
self._device_inbox_id_gen.advance(instance_name, token)
42-
for row in rows:
43-
if row.entity.startswith("@"):
44-
self._device_inbox_stream_cache.entity_has_changed(
45-
row.entity, token
46-
)
47-
else:
48-
self._device_federation_outbox_stream_cache.entity_has_changed(
49-
row.entity, token
50-
)
51-
return super().process_replication_rows(stream_name, instance_name, token, rows)
21+
pass

synapse/replication/tcp/handler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
EventsStream,
5757
FederationStream,
5858
Stream,
59+
ToDeviceStream,
5960
TypingStream,
6061
)
6162

@@ -115,6 +116,14 @@ def __init__(self, hs):
115116

116117
continue
117118

119+
if isinstance(stream, ToDeviceStream):
120+
# Only add ToDeviceStream as a source on instances in charge of
121+
# sending to device messages.
122+
if hs.get_instance_name() in hs.config.worker.writers.to_device:
123+
self._streams_to_replicate.append(stream)
124+
125+
continue
126+
118127
if isinstance(stream, TypingStream):
119128
# Only add TypingStream as a source on the instance in charge of
120129
# typing.

synapse/storage/databases/main/__init__.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
127127
self._presence_id_gen = StreamIdGenerator(
128128
db_conn, "presence_stream", "stream_id"
129129
)
130-
self._device_inbox_id_gen = StreamIdGenerator(
131-
db_conn, "device_inbox", "stream_id"
132-
)
133130
self._public_room_id_gen = StreamIdGenerator(
134131
db_conn, "public_room_list_stream", "stream_id"
135132
)
@@ -189,36 +186,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
189186
prefilled_cache=presence_cache_prefill,
190187
)
191188

192-
max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
193-
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
194-
db_conn,
195-
"device_inbox",
196-
entity_column="user_id",
197-
stream_column="stream_id",
198-
max_value=max_device_inbox_id,
199-
limit=1000,
200-
)
201-
self._device_inbox_stream_cache = StreamChangeCache(
202-
"DeviceInboxStreamChangeCache",
203-
min_device_inbox_id,
204-
prefilled_cache=device_inbox_prefill,
205-
)
206-
# The federation outbox and the local device inbox uses the same
207-
# stream_id generator.
208-
device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
209-
db_conn,
210-
"device_federation_outbox",
211-
entity_column="destination",
212-
stream_column="stream_id",
213-
max_value=max_device_inbox_id,
214-
limit=1000,
215-
)
216-
self._device_federation_outbox_stream_cache = StreamChangeCache(
217-
"DeviceFederationOutboxStreamChangeCache",
218-
min_device_outbox_id,
219-
prefilled_cache=device_outbox_prefill,
220-
)
221-
222189
device_list_max = self._device_list_id_gen.get_current_token()
223190
self._device_list_stream_cache = StreamChangeCache(
224191
"DeviceListStreamChangeCache", device_list_max

0 commit comments

Comments
 (0)