Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15929.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814), dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697) behind a config flag. Contributed by Nico from Famedly and H-Shay.
21 changes: 21 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,27 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)

# MSC2697 (device dehydration)
# Enabled by default since this option was added after adding the feature.
# It is not recommended that both MSC2697 and MSC3814 both be enabled at
# once.
self.msc2697_enabled: bool = experimental.get("msc2697_enabled", True)

# MSC3814 (dehydrated devices with SSSS)
# This is an alternative method to achieve the same goals as MSC2697.
# It is not recommended that both MSC2697 and MSC3814 both be enabled at
# once.
self.msc3814_enabled: bool = experimental.get("msc3814_enabled", False)

if self.msc2697_enabled and self.msc3814_enabled:
raise ConfigError(
"MSC2697 and MSC3814 should not both be enabled.",
(
"experimental_features",
"msc3814_enabled",
),
)

# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)

Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ async def notify_user_signature_update(
async def store_dehydrated_device(
self,
user_id: str,
device_id: Optional[str],
device_data: JsonDict,
initial_device_display_name: Optional[str] = None,
) -> str:
Expand All @@ -661,14 +662,15 @@ async def store_dehydrated_device(

Args:
user_id: the user that we are storing the device for
device_id: device id supplied by client
device_data: the dehydrated device information
initial_device_display_name: The display name to use for the device
Returns:
device id of the dehydrated device
"""
device_id = await self.check_device_registered(
user_id,
None,
device_id,
initial_device_display_name,
)
old_device_id = await self.store.store_dehydrated_device(
Expand Down
108 changes: 106 additions & 2 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Dict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, Optional

from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.errors import Codes, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
Expand Down Expand Up @@ -48,6 +49,9 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
if hs.config.experimental.msc3814_enabled:
self.event_sources = hs.get_event_sources()
self.device_handler = hs.get_device_handler()

# We only need to poke the federation sender explicitly if its on the
# same instance. Other federation sender instances will get notified by
Expand Down Expand Up @@ -303,3 +307,103 @@ async def send_device_message(
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)

async def get_events_for_dehydrated_device(
self,
requester: Requester,
device_id: str,
since_token: Optional[str],
limit: int,
) -> JsonDict:
"""Fetches up to `limit` events sent to `device_id` starting from `since_token`
and returns the new since token. If there are no more messages, returns an empty
array.

Args:
requester: the user requesting the messages
device_id: ID of the dehydrated device
since_token: stream id to start from when fetching messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a /sync next_batch stream token or a stream id? (I'd expect stream ids to be integers)

limit: the number of messages to fetch
Returns:
A dict containing the to-device messages, as well as a token that the client
can provide in the next call to fetch the next batch of messages
"""

user_id = requester.user.to_string()

# only allow fetching messages for the dehydrated device id currently associated
# with the user
dehydrated_device = await self.device_handler.get_dehydrated_device(user_id)
if dehydrated_device is None:
raise SynapseError(
HTTPStatus.FORBIDDEN,
"No dehydrated device exists",
Codes.FORBIDDEN,
)

dehydrated_device_id, _ = dehydrated_device
if device_id != dehydrated_device_id:
raise SynapseError(
HTTPStatus.FORBIDDEN,
"You may only fetch messages for your dehydrated device",
Codes.FORBIDDEN,
)

since_stream_id = 0
if since_token:
if not since_token.startswith("d"):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"from parameter %r has an invalid format" % (since_token,),
errcode=Codes.INVALID_PARAM,
)

try:
since_stream_id = int(since_token[1:])
except Exception:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"from parameter %r has an invalid format" % (since_token,),
errcode=Codes.INVALID_PARAM,
)

# if we have a since token, delete any to-device messages before that token
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned already, I think we should not delete any delivered to-device messages for dehydrated devices.

We can do this in a later PR, but I think that this will be crucial to ease the resumption of rehydration and ensure that room keys don't get lost because a device aborted the rehydration step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can open a follow-up PR to stop deleting the delivered to-device messages and address the TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The follow-up PR is here: #16010

# (since we now know that the device has received them)
deleted = await self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
logger.debug(
"Deleted %d to-device messages up to %d for user_id %s device_id %s",
deleted,
since_stream_id,
user_id,
device_id,
)

to_token = self.event_sources.get_current_token().to_device_key

messages, stream_id = await self.store.get_messages_for_device(
user_id, device_id, since_stream_id, to_token, limit
)

for message in messages:
# Remove the message id before sending to client
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d) for "
"dehydrated device %s, user_id %s",
len(messages),
since_stream_id,
stream_id,
to_token,
device_id,
user_id,
)

return {
"events": messages,
"next_batch": f"d{stream_id}",
}
Loading