Skip to content

Sliding Sync: Notify and sync when one-time keys or fallback keys are claimed/uploaded #17820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17820.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix sync not being notified when device one-time keys or fallback keys are uploaded/claimed.
8 changes: 7 additions & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, hs: "HomeServer"):
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
self._worker_lock_handler = hs.get_worker_locks_handler()
self._notifier = hs.get_notifier()

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -615,7 +616,7 @@ async def claim_local_one_time_keys(
3. Attempt to fetch fallback keys from the database.

Args:
local_query: An iterable of tuples of (user ID, device ID, algorithm).
local_query: An iterable of tuples of (user ID, device ID, algorithm, number of keys to claim).
always_include_fallback_keys: True to always include fallback keys.

Returns:
Expand All @@ -629,6 +630,7 @@ async def claim_local_one_time_keys(
]

otk_results, not_found = await self.store.claim_e2e_one_time_keys(local_query)
self._notifier.notify_one_time_keys_changed(otk_results.keys())

# If the application services have not provided any keys via the C-S
# API, query it directly for one-time keys.
Expand All @@ -639,6 +641,7 @@ async def claim_local_one_time_keys(
appservice_results,
not_found,
) = await self._appservice_handler.claim_e2e_one_time_keys(not_found)
self._notifier.notify_one_time_keys_changed(appservice_results.keys())
else:
appservice_results = {}

Expand Down Expand Up @@ -693,6 +696,7 @@ async def claim_local_one_time_keys(
# For each user that does not have a one-time keys available, see if
# there is a fallback key.
fallback_results = await self.store.claim_e2e_fallback_keys(fallback_query)
self._notifier.notify_one_time_keys_changed(fallback_results.keys())

# Return the results in order, each item from the input query should
# only appear once in the combined list.
Expand Down Expand Up @@ -833,6 +837,7 @@ async def upload_keys_for_user(
await self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
self._notifier.notify_one_time_keys_changed([user_id])
else:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
Expand All @@ -849,6 +854,7 @@ async def upload_keys_for_user(
}
)
await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
self._notifier.notify_one_time_keys_changed([user_id])
elif fallback_keys:
log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
else:
Expand Down
39 changes: 39 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ def on_new_replication_data(self) -> None:
without waking up any of the normal user event streams"""
self.notify_replication()

# FIXME: Should this be renamed to `wait_for_activity`? This listens for new events
# and when one-time keys are claimed which doesn't correspond to an event.
async def wait_for_events(
self,
user_id: str,
Expand Down Expand Up @@ -900,6 +902,43 @@ def notify_lock_released(
for cb in self._lock_released_callback:
cb(instance_name, lock_name, lock_key)

def notify_one_time_keys_changed(
self,
users: Union[StrCollection, Collection[UserID]],
) -> None:
"""
Used by handlers to inform the notifier that a one-time key has been
claimed or uploaded
"""
# Bail early if there is nothing to do
if not users:
return

time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is None:
continue

try:
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")

# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)

# FIXME: How can we poke the replication so that other workers also see the
# one-time key change
Comment on lines +939 to +940
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the way to actually fix this is to introduce a new stream like StreamKeyType.E2EE_KEYS/E2eeKeysStream for changes to one-time or fallback keys changing?

Just want to sanity check before I go that route.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@attr.s(auto_attribs=True)
class ReplicationNotifier:
Expand Down
Loading
Loading