Skip to content

Commit 0017bac

Browse files
Adds support for preferring local replication
1 parent 44a2f18 commit 0017bac

File tree

5 files changed

+119
-19
lines changed

5 files changed

+119
-19
lines changed

fractal_database_matrix/controllers/replicate.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def to(
2121
registration_token: str,
2222
confirm: bool = False,
2323
set_as_origin: bool = False,
24+
local_url: Optional[str] = None,
2425
**kwargs,
2526
):
2627
"""
@@ -30,6 +31,7 @@ def to(
3031
registration_token: Registration token for the homeserver. Necessary for registering your devices with this homeserver.
3132
confirm: Consent to replicating your data to the provided homeserver.
3233
set_as_origin: Set the homeserver as your current database's origin.
34+
local_url: Local URL for the homeserver (prefer using this url for faster replication).
3335
"""
3436
if not confirm:
3537
res = input(
@@ -67,7 +69,7 @@ def to(
6769
try:
6870
homeserver = MatrixHomeserver.objects.get(url=homeserver_url)
6971
if not homeserver.replication_enabled:
70-
homeserver.update(replication_enabled=True)
72+
homeserver.update(replication_enabled=True, local_url=local_url)
7173
except MatrixHomeserver.DoesNotExist:
7274
with transaction.atomic():
7375
# create the homeserver
@@ -78,6 +80,7 @@ def to(
7880
registration_token=registration_token,
7981
parent_db=current_database,
8082
replication_enabled=True,
83+
local_url=local_url,
8184
)
8285
current_device.add_membership(homeserver)
8386
else:
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import fractal_database.fields
2+
from django.db import migrations
3+
4+
5+
class Migration(migrations.Migration):
6+
7+
dependencies = [
8+
('fractal_database_matrix', '0001_initial'),
9+
]
10+
11+
operations = [
12+
migrations.AddField(
13+
model_name='matrixhomeserver',
14+
name='local_url',
15+
field=fractal_database.fields.LocalURLField(blank=True, null=True),
16+
),
17+
]

fractal_database_matrix/models.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from asgiref.sync import sync_to_async
1111
from django.conf import settings
1212
from django.db import models, transaction
13+
from docker.errors import NotFound
14+
from docker.models.networks import Network
1315
from fractal.cli.controllers.auth import AuthenticatedController
16+
from fractal_database.fields import LocalURLField
1417
from fractal_database.models import (
1518
BaseModel,
1619
Database,
@@ -46,6 +49,7 @@
4649
class MatrixHomeserver(Service):
4750
SYNAPSE_COMPOSE_FILE_PATH = f"{fractal_database_matrix.__path__[0]}/synapse"
4851
SYNAPSE_LOCAL = "http://localhost:8008"
52+
LOCAL_MATRIX_NETWORK = "fractal-matrix-network"
4953

5054
credentials: models.QuerySet["MatrixCredentials"]
5155
gateways: models.QuerySet["Gateway"]
@@ -54,6 +58,7 @@ class MatrixHomeserver(Service):
5458
priority = models.PositiveIntegerField(default=0, blank=True, null=True)
5559
registration_token = models.CharField(max_length=255, blank=True, null=True)
5660
replication_enabled = models.BooleanField(default=False)
61+
local_url = LocalURLField()
5762

5863
def __str__(self) -> str:
5964
return f"{self.url} (MatrixHomeserver)"
@@ -153,10 +158,32 @@ def create(cls, device: Optional["Device"] = None, **ckwargs) -> "MatrixHomeserv
153158
def _build_images(self) -> None:
154159
docker_compose("build", _cwd=self.SYNAPSE_COMPOSE_FILE_PATH)
155160

161+
@classmethod
162+
def get_docker_network(cls) -> Network:
163+
client = docker.from_env()
164+
return client.networks.get(cls.LOCAL_MATRIX_NETWORK)
165+
166+
@classmethod
167+
def create_docker_network(cls) -> None:
168+
"""
169+
Ensures that the external docker network that is specified
170+
in the homeserver compose file exists.
171+
"""
172+
client = docker.from_env()
173+
try:
174+
cls.get_docker_network()
175+
except NotFound:
176+
logger.info("Creating local matrix network %s" % cls.LOCAL_MATRIX_NETWORK)
177+
client.networks.create(cls.LOCAL_MATRIX_NETWORK)
178+
156179
def _render_compose_file(self) -> str:
157180
""" """
158181
self._build_images()
159182

183+
if self.local_url:
184+
# ensure that external matrix network is created
185+
self.create_docker_network()
186+
160187
# FIXME: handle multiple links
161188
if not hasattr(self.config, "links") or not hasattr(self, "gateways"):
162189
raise Exception("No links or gateways found for MatrixHomeserver %s" % self)

fractal_database_matrix/operations.py

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ async def put_state(
4444

4545
access_token, homeserver_url, _ = creds
4646

47+
if homeserver_url != channel.homeserver.url:
48+
raise Exception(
49+
f"You are logged into the wrong homeserver ({homeserver_url}). You must be logged into the homeserver {channel.homeserver.url}"
50+
)
51+
else:
52+
# credentials will work with the local URL as well
53+
# prefer the local URL if it exists
54+
homeserver_url = channel.homeserver.local_url or homeserver_url
55+
4756
async with MatrixClient(homeserver_url, access_token, max_timeouts=15) as client:
4857
res = await client.room_put_state(
4958
room_id,
@@ -81,6 +90,13 @@ async def create_room(
8190

8291
access_token, homeserver_url, _ = creds
8392

93+
if homeserver_url != channel.homeserver.url:
94+
raise Exception("You must be logged into the correct homeserver")
95+
else:
96+
# credentials will work with the local URL as well
97+
# prefer the local URL if it exists
98+
homeserver_url = channel.homeserver.local_url or homeserver_url
99+
84100
# verify that matrix IDs passed in invite are all lowercase
85101
if invite:
86102
if not any([matrix_id.split("@")[1].islower() for matrix_id in invite]):
@@ -117,6 +133,13 @@ async def add_subspace(
117133

118134
access_token, homeserver_url, _ = creds
119135

136+
if homeserver_url != channel.homeserver.url:
137+
raise Exception("You must be logged into the correct homeserver")
138+
else:
139+
# credentials will work with the local URL as well
140+
# prefer the local URL if it exists
141+
homeserver_url = channel.homeserver.local_url or homeserver_url
142+
120143
async with MatrixClient(homeserver_url, access_token, max_timeouts=15) as client:
121144
res = await client.room_put_state(
122145
parent_room_id,
@@ -133,7 +156,10 @@ async def add_subspace(
133156
)
134157

135158
async def accept_invite_as_user(
136-
self, room_id: str, homeserver_url: str, matrix_creds: Optional[tuple[str, str]] = None
159+
self,
160+
room_id: str,
161+
channel: "MatrixReplicationChannel",
162+
matrix_creds: Optional[tuple[str, str]] = None,
137163
):
138164
"""
139165
Args:
@@ -150,16 +176,27 @@ async def accept_invite_as_user(
150176
else:
151177
user_matrix_id, access_token = matrix_creds
152178

179+
if homeserver_url != channel.homeserver.url:
180+
raise Exception(
181+
f"You are currently logged into {homeserver_url} not {channel.homeserver.url}"
182+
)
183+
else:
184+
# credentials will work with the local URL as well
185+
# prefer the local URL if it exists
186+
homeserver_url = channel.homeserver.local_url or homeserver_url
187+
153188
async with MatrixClient(
154189
homeserver_url=homeserver_url, access_token=access_token, max_timeouts=15
155190
) as client:
156191
logger.info("Accepting invite for %s as %s" % (room_id, user_matrix_id))
157192
await client.join_room(room_id)
158193

159194
async def accept_invite_as_device(
160-
self, device_creds: "MatrixCredentials", room_id: str, homeserver_url: str
195+
self, device_creds: "MatrixCredentials", room_id: str, channel: "MatrixReplicationChannel"
161196
):
162197
device_matrix_id = device_creds.matrix_id
198+
homeserver_url = channel.homeserver.local_url or channel.homeserver.url
199+
163200
# accept invite on behalf of device
164201
async with MatrixClient(
165202
homeserver_url=homeserver_url,
@@ -169,7 +206,9 @@ async def accept_invite_as_device(
169206
logger.info("Accepting invite for %s as %s" % (room_id, device_matrix_id))
170207
await client.join_room(room_id)
171208

172-
async def invite_user(self, matrix_id: str, room_id: str) -> None:
209+
async def invite_user(
210+
self, matrix_id: str, channel: "MatrixReplicationChannel", room_id: str
211+
) -> None:
173212
# FIXME: Once user has accounts on many homeservers, we need to strip the
174213
# host off of the room id and try to find credentials that match that host
175214
creds = AuthenticatedController.get_creds()
@@ -178,6 +217,15 @@ async def invite_user(self, matrix_id: str, room_id: str) -> None:
178217
else:
179218
raise Exception("You must be logged in to Matrix to invite a device")
180219

220+
if homeserver_url != channel.homeserver.url:
221+
raise Exception(
222+
f"You are currently logged into {homeserver_url} not {channel.homeserver.url}"
223+
)
224+
else:
225+
# credentials will work with the local URL as well
226+
# prefer the local URL if it exists
227+
homeserver_url = channel.homeserver.local_url or homeserver_url
228+
181229
async with MatrixClient(
182230
homeserver_url=homeserver_url,
183231
access_token=access_token,
@@ -216,7 +264,7 @@ async def register_device_account(
216264

217265
async def set_display_name(
218266
self,
219-
homeserver_url: str,
267+
channel: "MatrixReplicationChannel",
220268
creds: "MatrixCredentials",
221269
display_name: str,
222270
owner_matrix_id: Optional[str] = None,
@@ -226,6 +274,8 @@ async def set_display_name(
226274
owner_username = owner_matrix_id.split("@")[1].split(":")[0]
227275
display_name = f"{owner_username}'s {display_name}"
228276

277+
homeserver_url = channel.homeserver.local_url or channel.homeserver.url
278+
229279
async with MatrixClient(
230280
homeserver_url=homeserver_url,
231281
access_token=creds.access_token,
@@ -293,7 +343,7 @@ async def run(self, operation: "DurableOperation") -> dict[str, str]:
293343
async for account in membership.device.matrixcredentials_set.filter(
294344
homeserver=channel.homeserver
295345
):
296-
await self.accept_invite_as_device(account, room_id, channel.homeserver.url)
346+
await self.accept_invite_as_device(account, room_id, channel)
297347

298348
logger.info("Successfully created Matrix Room for %s" % name)
299349
return {metadata_label: room_id}
@@ -545,7 +595,7 @@ async def run(self, operation: "DurableOperation") -> None:
545595
raise Exception(f"Failed to find room id in channel metadata for {metadata_label}")
546596

547597
try:
548-
await self.invite_user(device_creds.matrix_id, room_id)
598+
await self.invite_user(device_creds.matrix_id, channel, room_id)
549599
except Exception as e:
550600
# if the device is already in the room, no need to accept the invite
551601
if "is already in the room" in str(e):
@@ -594,7 +644,7 @@ async def run(self, operation: "DurableOperation") -> None:
594644
% (metadata_label, room_id, device_creds.matrix_id)
595645
)
596646
# accept invite on behalf of device
597-
await self.accept_invite_as_device(device_creds, room_id, channel.homeserver.url)
647+
await self.accept_invite_as_device(device_creds, room_id, channel)
598648
logger.info("Device has successfully joined space %s for channel %s" % (room_id, channel))
599649

600650
return None
@@ -698,9 +748,7 @@ async def run(self, operation: "DurableOperation") -> None:
698748
raise Exception(f"Failed to find device credentials for {membership.device}")
699749

700750
# accept invite on behalf of device
701-
await self.accept_invite_as_device(
702-
device_creds, channel.device_space, channel.homeserver.url
703-
)
751+
await self.accept_invite_as_device(device_creds, channel.device_space, channel)
704752
logger.info(
705753
"Device has successfully joined the devices subspace for channel %s" % channel
706754
)
@@ -753,7 +801,7 @@ async def run(self, operation: "DurableOperation") -> None:
753801
raise Exception(f"Failed to find device credentials for {membership.device}")
754802

755803
try:
756-
await self.invite_user(device_creds.matrix_id, channel.device_space)
804+
await self.invite_user(device_creds.matrix_id, channel, channel.device_space)
757805
except Exception as e:
758806
# if the device is already in the room, no need to accept the invite
759807
if "is already in the room" in str(e):
@@ -1115,7 +1163,7 @@ async def run(self, operation: DurableOperation) -> None:
11151163
)
11161164

11171165
await self.set_display_name(
1118-
channel.homeserver.url, device_creds, display_name, owner_matrix_id=owner_matrix_id
1166+
channel, device_creds, display_name, owner_matrix_id=owner_matrix_id
11191167
)
11201168

11211169

@@ -1213,7 +1261,7 @@ async def run(self, operation: "DurableOperation") -> None:
12131261
% (room_id_label, user_matrix_id, channel)
12141262
)
12151263
try:
1216-
await self.accept_invite_as_user(room_id, homeserver_url=channel.homeserver.url)
1264+
await self.accept_invite_as_user(room_id, channel=channel)
12171265
except Exception as e:
12181266
# if the user is already in the room, no need to accept the invite
12191267
if "is already in the room" in str(e):
@@ -1297,10 +1345,10 @@ async def run(self, operation: "DurableOperation") -> None:
12971345
) # type: ignore
12981346

12991347
# fetch channel in order to get room_id for the group to invite user to
1300-
channel: (
1301-
"MatrixReplicationChannel"
1302-
) = await operation.channel_type.model_class().objects.aget(
1303-
pk=operation.channel_id
1348+
channel: "MatrixReplicationChannel" = (
1349+
await operation.channel_type.model_class()
1350+
.objects.select_related("homeserver")
1351+
.aget(pk=operation.channel_id)
13041352
) # type: ignore
13051353

13061354
user_matrix_id = membership.user.matrix_id
@@ -1317,7 +1365,7 @@ async def run(self, operation: "DurableOperation") -> None:
13171365
% (room_id_label, user_matrix_id, channel)
13181366
)
13191367
try:
1320-
await self.invite_user(user_matrix_id, room_id)
1368+
await self.invite_user(user_matrix_id, channel, room_id)
13211369
except Exception as e:
13221370
# if the user is already in the room, no need to accept the invite
13231371
if "is already in the room" in str(e):

fractal_database_matrix/synapse/docker-compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,8 @@ services:
4343

4444
volumes:
4545
synapse-db:
46+
47+
networks:
48+
default:
49+
name: fractal-matrix-network
50+
external: true

0 commit comments

Comments
 (0)