Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6559.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port `synapse.handlers.admin` and `synapse.handlers.deactivate_account` to async/await.
6 changes: 4 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ def export_data_command(hs, args):
user_id = args.user_id
directory = args.output_directory

res = yield hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
res = yield defer.ensureDeferred(
hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
)
print(res)

Expand Down
41 changes: 17 additions & 24 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.constants import Membership
from synapse.types import RoomStreamToken
from synapse.visibility import filter_events_for_client
Expand All @@ -33,11 +31,10 @@ def __init__(self, hs):
self.storage = hs.get_storage()
self.state_store = self.storage.state

@defer.inlineCallbacks
def get_whois(self, user):
async def get_whois(self, user):
connections = []

sessions = yield self.store.get_user_ip_and_agents(user)
sessions = await self.store.get_user_ip_and_agents(user)
for session in sessions:
connections.append(
{
Expand All @@ -54,20 +51,18 @@ def get_whois(self, user):

return ret

@defer.inlineCallbacks
def get_users(self):
async def get_users(self):
"""Function to retrieve a list of users in users table.

Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.get_users()
ret = await self.store.get_users()

return ret

@defer.inlineCallbacks
def get_users_paginate(self, start, limit, name, guests, deactivated):
async def get_users_paginate(self, start, limit, name, guests, deactivated):
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users.

Expand All @@ -80,14 +75,13 @@ def get_users_paginate(self, start, limit, name, guests, deactivated):
Returns:
defer.Deferred: resolves to json list[dict[str, Any]]
"""
ret = yield self.store.get_users_paginate(
ret = await self.store.get_users_paginate(
start, limit, name, guests, deactivated
)

return ret

@defer.inlineCallbacks
def search_users(self, term):
async def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.

Expand All @@ -96,7 +90,7 @@ def search_users(self, term):
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.search_users(term)
ret = await self.store.search_users(term)

return ret

Expand All @@ -119,8 +113,7 @@ def set_user_server_admin(self, user, admin):
"""
return self.store.set_server_admin(user, admin)

@defer.inlineCallbacks
def export_user_data(self, user_id, writer):
async def export_user_data(self, user_id, writer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one looks like it's got a yield in admin_cmd.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point. Apparently even searching for function references correctly was too much for me on Tuesday

"""Write all data we have on the user to the given writer.

Args:
Expand All @@ -132,7 +125,7 @@ def export_user_data(self, user_id, writer):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = yield self.store.get_rooms_for_user_where_membership_is(
rooms = await self.store.get_rooms_for_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
Expand All @@ -145,7 +138,7 @@ def export_user_data(self, user_id, writer):
# We only try and fetch events for rooms the user has been in. If
# they've been e.g. invited to a room without joining then we handle
# those seperately.
rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)
rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)

for index, room in enumerate(rooms):
room_id = room.room_id
Expand All @@ -154,7 +147,7 @@ def export_user_data(self, user_id, writer):
"[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
)

forgotten = yield self.store.did_forget(user_id, room_id)
forgotten = await self.store.did_forget(user_id, room_id)
if forgotten:
logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
continue
Expand All @@ -166,7 +159,7 @@ def export_user_data(self, user_id, writer):

if room.membership == Membership.INVITE:
event_id = room.event_id
invite = yield self.store.get_event(event_id, allow_none=True)
invite = await self.store.get_event(event_id, allow_none=True)
if invite:
invited_state = invite.unsigned["invite_room_state"]
writer.write_invite(room_id, invite, invited_state)
Expand All @@ -177,7 +170,7 @@ def export_user_data(self, user_id, writer):
# were joined. We estimate that point by looking at the
# stream_ordering of the last membership if it wasn't a join.
if room.membership == Membership.JOIN:
stream_ordering = yield self.store.get_room_max_stream_ordering()
stream_ordering = self.store.get_room_max_stream_ordering()
else:
stream_ordering = room.stream_ordering

Expand All @@ -203,15 +196,15 @@ def export_user_data(self, user_id, writer):
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = yield self.store.paginate_room_events(
events, _ = await self.store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction="f"
)
if not events:
break

from_key = events[-1].internal_metadata.after

events = yield filter_events_for_client(self.storage, user_id, events)
events = await filter_events_for_client(self.storage, user_id, events)

writer.write_events(room_id, events)

Expand Down Expand Up @@ -247,7 +240,7 @@ def export_user_data(self, user_id, writer):
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
state = yield self.state_store.get_state_for_event(event_id)
state = await self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)

return writer.finished()
Expand Down
54 changes: 24 additions & 30 deletions synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# limitations under the License.
import logging

from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
Expand Down Expand Up @@ -46,8 +44,7 @@ def __init__(self, hs):

self._account_validity_enabled = hs.config.account_validity.enabled

@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data, id_server=None):
async def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account

Args:
Expand All @@ -74,11 +71,11 @@ def deactivate_account(self, user_id, erase_data, id_server=None):
identity_server_supports_unbinding = True

# Retrieve the 3PIDs this user has bound to an identity server
threepids = yield self.store.user_get_bound_threepids(user_id)
threepids = await self.store.user_get_bound_threepids(user_id)

for threepid in threepids:
try:
result = yield self._identity_handler.try_unbind_threepid(
result = await self._identity_handler.try_unbind_threepid(
user_id,
{
"medium": threepid["medium"],
Expand All @@ -91,64 +88,63 @@ def deactivate_account(self, user_id, erase_data, id_server=None):
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
raise SynapseError(400, "Failed to remove threepid from ID server")
yield self.store.user_delete_threepid(
await self.store.user_delete_threepid(
user_id, threepid["medium"], threepid["address"]
)

# Remove all 3PIDs this user has bound to the homeserver
yield self.store.user_delete_threepids(user_id)
await self.store.user_delete_threepids(user_id)

# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
await self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
await self._auth_handler.delete_access_tokens_for_user(user_id)

yield self.store.user_set_password_hash(user_id, None)
await self.store.user_set_password_hash(user_id, None)

# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
yield self.store.add_user_pending_deactivation(user_id)
await self.store.add_user_pending_deactivation(user_id)

# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
await self.user_directory_handler.handle_user_deactivated(user_id)

# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
yield self.store.mark_user_erased(user_id)
await self.store.mark_user_erased(user_id)

# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()

# Reject all pending invites for the user, so that the user doesn't show up in the
# "invited" section of rooms' members list.
yield self._reject_pending_invites_for_user(user_id)
await self._reject_pending_invites_for_user(user_id)

# Remove all information on the user from the account_validity table.
if self._account_validity_enabled:
yield self.store.delete_account_validity_for_user(user_id)
await self.store.delete_account_validity_for_user(user_id)

# Mark the user as deactivated.
yield self.store.set_user_deactivated_status(user_id, True)
await self.store.set_user_deactivated_status(user_id, True)

return identity_server_supports_unbinding

@defer.inlineCallbacks
def _reject_pending_invites_for_user(self, user_id):
async def _reject_pending_invites_for_user(self, user_id):
"""Reject pending invites addressed to a given user ID.

Args:
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)

for room in pending_invites:
try:
yield self._room_member_handler.update_membership(
await self._room_member_handler.update_membership(
create_requester(user),
user,
room.room_id,
Expand Down Expand Up @@ -180,8 +176,7 @@ def _start_user_parting(self):
if not self._user_parter_running:
run_as_background_process("user_parter_loop", self._user_parter_loop)

@defer.inlineCallbacks
def _user_parter_loop(self):
async def _user_parter_loop(self):
"""Loop that parts deactivated users from rooms

Returns:
Expand All @@ -191,31 +186,30 @@ def _user_parter_loop(self):
logger.info("Starting user parter")
try:
while True:
user_id = yield self.store.get_user_pending_deactivation()
user_id = await self.store.get_user_pending_deactivation()
if user_id is None:
break
logger.info("User parter parting %r", user_id)
yield self._part_user(user_id)
yield self.store.del_user_pending_deactivation(user_id)
await self._part_user(user_id)
await self.store.del_user_pending_deactivation(user_id)
logger.info("User parter finished parting %r", user_id)
logger.info("User parter finished: stopping")
finally:
self._user_parter_running = False

@defer.inlineCallbacks
def _part_user(self, user_id):
async def _part_user(self, user_id):
"""Causes the given user_id to leave all the rooms they're joined to

Returns:
None
"""
user = UserID.from_string(user_id)

rooms_for_user = yield self.store.get_rooms_for_user(user_id)
rooms_for_user = await self.store.get_rooms_for_user(user_id)
for room_id in rooms_for_user:
logger.info("User parter parting %r from %r", user_id, room_id)
try:
yield self._room_member_handler.update_membership(
await self._room_member_handler.update_membership(
create_requester(user),
user,
room_id,
Expand Down