-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Support MSC3814: Dehydrated Devices #15929
Changes from all commits
6baeda9
ace4f49
49f892d
6c183c5
6d0ce6f
b95364e
395e039
b20c4c7
a950d5e
fe9be3c
ccd6c12
664ad97
a52a25a
0f49f81
f7e0933
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814), dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697) behind a config flag. Contributed by Nico from Famedly and H-Shay. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,10 +13,11 @@ | |
# limitations under the License. | ||
|
||
import logging | ||
from typing import TYPE_CHECKING, Any, Dict | ||
from http import HTTPStatus | ||
from typing import TYPE_CHECKING, Any, Dict, Optional | ||
|
||
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes | ||
from synapse.api.errors import SynapseError | ||
from synapse.api.errors import Codes, SynapseError | ||
from synapse.api.ratelimiting import Ratelimiter | ||
from synapse.logging.context import run_in_background | ||
from synapse.logging.opentracing import ( | ||
|
@@ -48,6 +49,9 @@ def __init__(self, hs: "HomeServer"): | |
self.store = hs.get_datastores().main | ||
self.notifier = hs.get_notifier() | ||
self.is_mine = hs.is_mine | ||
if hs.config.experimental.msc3814_enabled: | ||
self.event_sources = hs.get_event_sources() | ||
self.device_handler = hs.get_device_handler() | ||
|
||
# We only need to poke the federation sender explicitly if its on the | ||
# same instance. Other federation sender instances will get notified by | ||
|
@@ -303,3 +307,103 @@ async def send_device_message( | |
# Enqueue a new federation transaction to send the new | ||
# device messages to each remote destination. | ||
self.federation_sender.send_device_messages(destination) | ||
|
||
async def get_events_for_dehydrated_device( | ||
self, | ||
requester: Requester, | ||
device_id: str, | ||
since_token: Optional[str], | ||
limit: int, | ||
) -> JsonDict: | ||
"""Fetches up to `limit` events sent to `device_id` starting from `since_token` | ||
and returns the new since token. If there are no more messages, returns an empty | ||
array. | ||
|
||
Args: | ||
requester: the user requesting the messages | ||
device_id: ID of the dehydrated device | ||
since_token: stream id to start from when fetching messages | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a /sync next_batch stream token or a stream id? (I'd expect stream ids to be integers) |
||
limit: the number of messages to fetch | ||
Returns: | ||
A dict containing the to-device messages, as well as a token that the client | ||
can provide in the next call to fetch the next batch of messages | ||
""" | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
user_id = requester.user.to_string() | ||
|
||
# only allow fetching messages for the dehydrated device id currently associated | ||
# with the user | ||
dehydrated_device = await self.device_handler.get_dehydrated_device(user_id) | ||
if dehydrated_device is None: | ||
raise SynapseError( | ||
HTTPStatus.FORBIDDEN, | ||
"No dehydrated device exists", | ||
Codes.FORBIDDEN, | ||
) | ||
|
||
dehydrated_device_id, _ = dehydrated_device | ||
if device_id != dehydrated_device_id: | ||
raise SynapseError( | ||
HTTPStatus.FORBIDDEN, | ||
"You may only fetch messages for your dehydrated device", | ||
Codes.FORBIDDEN, | ||
) | ||
|
||
since_stream_id = 0 | ||
if since_token: | ||
if not since_token.startswith("d"): | ||
raise SynapseError( | ||
HTTPStatus.BAD_REQUEST, | ||
"from parameter %r has an invalid format" % (since_token,), | ||
errcode=Codes.INVALID_PARAM, | ||
) | ||
|
||
try: | ||
since_stream_id = int(since_token[1:]) | ||
except Exception: | ||
raise SynapseError( | ||
HTTPStatus.BAD_REQUEST, | ||
"from parameter %r has an invalid format" % (since_token,), | ||
errcode=Codes.INVALID_PARAM, | ||
) | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# if we have a since token, delete any to-device messages before that token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned already, I think we should not delete any delivered to-device messages for dehydrated devices. We can do this in a later PR, but I think that this will be crucial to ease the resumption of rehydration and ensure that room keys don't get lost because a device aborted the rehydration step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can open a follow-up PR to stop deleting the delivered to-device messages and address the TODOs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The follow-up PR is here: #16010 |
||
# (since we now know that the device has received them) | ||
deleted = await self.store.delete_messages_for_device( | ||
user_id, device_id, since_stream_id | ||
) | ||
logger.debug( | ||
"Deleted %d to-device messages up to %d for user_id %s device_id %s", | ||
deleted, | ||
since_stream_id, | ||
user_id, | ||
device_id, | ||
) | ||
|
||
to_token = self.event_sources.get_current_token().to_device_key | ||
|
||
messages, stream_id = await self.store.get_messages_for_device( | ||
user_id, device_id, since_stream_id, to_token, limit | ||
) | ||
|
||
for message in messages: | ||
# Remove the message id before sending to client | ||
message_id = message.pop("message_id", None) | ||
if message_id: | ||
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) | ||
|
||
logger.debug( | ||
"Returning %d to-device messages between %d and %d (current token: %d) for " | ||
"dehydrated device %s, user_id %s", | ||
len(messages), | ||
since_stream_id, | ||
stream_id, | ||
to_token, | ||
device_id, | ||
user_id, | ||
) | ||
|
||
return { | ||
"events": messages, | ||
"next_batch": f"d{stream_id}", | ||
} |
Uh oh!
There was an error while loading. Please reload this page.