Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e85c635

Browse files
committed
Merge commit '54f8d73c0' into anoa/dinsic_release_1_21_x
* commit '54f8d73c0': Convert additional databases to async/await (#8199)
2 parents 059e0fd + 54f8d73 commit e85c635

File tree

7 files changed

+147
-137
lines changed

7 files changed

+147
-137
lines changed

changelog.d/8199.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

synapse/storage/databases/main/__init__.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import calendar
1919
import logging
2020
import time
21-
from typing import Any, Dict, List, Optional
21+
from typing import Any, Dict, List, Optional, Tuple
2222

2323
from synapse.api.constants import PresenceState
2424
from synapse.config.homeserver import HomeServerConfig
@@ -294,24 +294,24 @@ def _get_active_presence(self, db_conn):
294294

295295
return [UserPresenceState(**row) for row in rows]
296296

297-
def count_daily_users(self):
297+
async def count_daily_users(self) -> int:
298298
"""
299299
Counts the number of users who used this homeserver in the last 24 hours.
300300
"""
301301
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
302-
return self.db_pool.runInteraction(
302+
return await self.db_pool.runInteraction(
303303
"count_daily_users", self._count_users, yesterday
304304
)
305305

306-
def count_monthly_users(self):
306+
async def count_monthly_users(self) -> int:
307307
"""
308308
Counts the number of users who used this homeserver in the last 30 days.
309309
Note this method is intended for phonehome metrics only and is different
310310
from the mau figure in synapse.storage.monthly_active_users which,
311311
amongst other things, includes a 3 day grace period before a user counts.
312312
"""
313313
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
314-
return self.db_pool.runInteraction(
314+
return await self.db_pool.runInteraction(
315315
"count_monthly_users", self._count_users, thirty_days_ago
316316
)
317317

@@ -330,15 +330,15 @@ def _count_users(self, txn, time_from):
330330
(count,) = txn.fetchone()
331331
return count
332332

333-
def count_r30_users(self):
333+
async def count_r30_users(self) -> Dict[str, int]:
334334
"""
335335
Counts the number of 30 day retained users, defined as:-
336336
* Users who have created their accounts more than 30 days ago
337337
* Where last seen at most 30 days ago
338338
* Where account creation and last_seen are > 30 days apart
339339
340-
Returns counts globaly for a given user as well as breaking
341-
by platform
340+
Returns:
341+
A mapping of counts globally as well as broken out by platform.
342342
"""
343343

344344
def _count_r30_users(txn):
@@ -411,7 +411,7 @@ def _count_r30_users(txn):
411411

412412
return results
413413

414-
return self.db_pool.runInteraction("count_r30_users", _count_r30_users)
414+
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
415415

416416
def _get_start_of_day(self):
417417
"""
@@ -421,7 +421,7 @@ def _get_start_of_day(self):
421421
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
422422
return today_start * 1000
423423

424-
def generate_user_daily_visits(self):
424+
async def generate_user_daily_visits(self) -> None:
425425
"""
426426
Generates daily visit data for use in cohort/ retention analysis
427427
"""
@@ -476,7 +476,7 @@ def _generate_user_daily_visits(txn):
476476
# frequently
477477
self._last_user_visit_update = now
478478

479-
return self.db_pool.runInteraction(
479+
await self.db_pool.runInteraction(
480480
"generate_user_daily_visits", _generate_user_daily_visits
481481
)
482482

@@ -500,22 +500,28 @@ async def get_users(self) -> List[Dict[str, Any]]:
500500
desc="get_users",
501501
)
502502

503-
def get_users_paginate(
504-
self, start, limit, user_id=None, name=None, guests=True, deactivated=False
505-
):
503+
async def get_users_paginate(
504+
self,
505+
start: int,
506+
limit: int,
507+
user_id: Optional[str] = None,
508+
name: Optional[str] = None,
509+
guests: bool = True,
510+
deactivated: bool = False,
511+
) -> Tuple[List[Dict[str, Any]], int]:
506512
"""Function to retrieve a paginated list of users from
507513
users list. This will return a json list of users and the
508514
total number of users matching the filter criteria.
509515
510516
Args:
511-
start (int): start number to begin the query from
512-
limit (int): number of rows to retrieve
513-
user_id (string): search for user_id. ignored if name is not None
514-
name (string): search for local part of user_id or display name
515-
guests (bool): whether to in include guest users
516-
deactivated (bool): whether to include deactivated users
517+
start: start number to begin the query from
518+
limit: number of rows to retrieve
519+
user_id: search for user_id. ignored if name is not None
520+
name: search for local part of user_id or display name
521+
guests: whether to in include guest users
522+
deactivated: whether to include deactivated users
517523
Returns:
518-
defer.Deferred: resolves to list[dict[str, Any]], int
524+
A tuple of a list of mappings from user to information and a count of total users.
519525
"""
520526

521527
def get_users_paginate_txn(txn):
@@ -558,7 +564,7 @@ def get_users_paginate_txn(txn):
558564
users = self.db_pool.cursor_to_dict(txn)
559565
return users, count
560566

561-
return self.db_pool.runInteraction(
567+
return await self.db_pool.runInteraction(
562568
"get_users_paginate_txn", get_users_paginate_txn
563569
)
564570

synapse/storage/databases/main/devices.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,9 @@ async def _get_device_update_edus_by_remote(
313313

314314
return results
315315

316-
def _get_last_device_update_for_remote_user(
316+
async def _get_last_device_update_for_remote_user(
317317
self, destination: str, user_id: str, from_stream_id: int
318-
):
318+
) -> int:
319319
def f(txn):
320320
prev_sent_id_sql = """
321321
SELECT coalesce(max(stream_id), 0) as stream_id
@@ -326,12 +326,16 @@ def f(txn):
326326
rows = txn.fetchall()
327327
return rows[0][0]
328328

329-
return self.db_pool.runInteraction("get_last_device_update_for_remote_user", f)
329+
return await self.db_pool.runInteraction(
330+
"get_last_device_update_for_remote_user", f
331+
)
330332

331-
def mark_as_sent_devices_by_remote(self, destination: str, stream_id: int):
333+
async def mark_as_sent_devices_by_remote(
334+
self, destination: str, stream_id: int
335+
) -> None:
332336
"""Mark that updates have successfully been sent to the destination.
333337
"""
334-
return self.db_pool.runInteraction(
338+
await self.db_pool.runInteraction(
335339
"mark_as_sent_devices_by_remote",
336340
self._mark_as_sent_devices_by_remote_txn,
337341
destination,
@@ -684,7 +688,7 @@ async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None:
684688
desc="make_remote_user_device_cache_as_stale",
685689
)
686690

687-
def mark_remote_user_device_list_as_unsubscribed(self, user_id: str):
691+
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
688692
"""Mark that we no longer track device lists for remote user.
689693
"""
690694

@@ -698,7 +702,7 @@ def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
698702
txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
699703
)
700704

701-
return self.db_pool.runInteraction(
705+
await self.db_pool.runInteraction(
702706
"mark_remote_user_device_list_as_unsubscribed",
703707
_mark_remote_user_device_list_as_unsubscribed_txn,
704708
)
@@ -959,9 +963,9 @@ async def update_device(
959963
desc="update_device",
960964
)
961965

962-
def update_remote_device_list_cache_entry(
966+
async def update_remote_device_list_cache_entry(
963967
self, user_id: str, device_id: str, content: JsonDict, stream_id: int
964-
):
968+
) -> None:
965969
"""Updates a single device in the cache of a remote user's devicelist.
966970
967971
Note: assumes that we are the only thread that can be updating this user's
@@ -972,11 +976,8 @@ def update_remote_device_list_cache_entry(
972976
device_id: ID of decivice being updated
973977
content: new data on this device
974978
stream_id: the version of the device list
975-
976-
Returns:
977-
Deferred[None]
978979
"""
979-
return self.db_pool.runInteraction(
980+
await self.db_pool.runInteraction(
980981
"update_remote_device_list_cache_entry",
981982
self._update_remote_device_list_cache_entry_txn,
982983
user_id,
@@ -1028,9 +1029,9 @@ def _update_remote_device_list_cache_entry_txn(
10281029
lock=False,
10291030
)
10301031

1031-
def update_remote_device_list_cache(
1032+
async def update_remote_device_list_cache(
10321033
self, user_id: str, devices: List[dict], stream_id: int
1033-
):
1034+
) -> None:
10341035
"""Replace the entire cache of the remote user's devices.
10351036
10361037
Note: assumes that we are the only thread that can be updating this user's
@@ -1040,11 +1041,8 @@ def update_remote_device_list_cache(
10401041
user_id: User to update device list for
10411042
devices: list of device objects supplied over federation
10421043
stream_id: the version of the device list
1043-
1044-
Returns:
1045-
Deferred[None]
10461044
"""
1047-
return self.db_pool.runInteraction(
1045+
await self.db_pool.runInteraction(
10481046
"update_remote_device_list_cache",
10491047
self._update_remote_device_list_cache_txn,
10501048
user_id,
@@ -1054,7 +1052,7 @@ def update_remote_device_list_cache(
10541052

10551053
def _update_remote_device_list_cache_txn(
10561054
self, txn: LoggingTransaction, user_id: str, devices: List[dict], stream_id: int
1057-
):
1055+
) -> None:
10581056
self.db_pool.simple_delete_txn(
10591057
txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id}
10601058
)

synapse/storage/databases/main/events_worker.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -823,20 +823,24 @@ def _fetch_event_rows(self, txn, event_ids):
823823

824824
return event_dict
825825

826-
def _maybe_redact_event_row(self, original_ev, redactions, event_map):
826+
def _maybe_redact_event_row(
827+
self,
828+
original_ev: EventBase,
829+
redactions: Iterable[str],
830+
event_map: Dict[str, EventBase],
831+
) -> Optional[EventBase]:
827832
"""Given an event object and a list of possible redacting event ids,
828833
determine whether to honour any of those redactions and if so return a redacted
829834
event.
830835
831836
Args:
832-
original_ev (EventBase):
833-
redactions (iterable[str]): list of event ids of potential redaction events
834-
event_map (dict[str, EventBase]): other events which have been fetched, in
835-
which we can look up the redaaction events. Map from event id to event.
837+
original_ev: The original event.
838+
redactions: list of event ids of potential redaction events
839+
event_map: other events which have been fetched, in which we can
840+
look up the redaaction events. Map from event id to event.
836841
837842
Returns:
838-
Deferred[EventBase|None]: if the event should be redacted, a pruned
839-
event object. Otherwise, None.
843+
If the event should be redacted, a pruned event object. Otherwise, None.
840844
"""
841845
if original_ev.type == "m.room.create":
842846
# we choose to ignore redactions of m.room.create events.
@@ -946,17 +950,17 @@ def _get_current_state_event_counts_txn(self, txn, room_id):
946950
row = txn.fetchone()
947951
return row[0] if row else 0
948952

949-
def get_current_state_event_counts(self, room_id):
953+
async def get_current_state_event_counts(self, room_id: str) -> int:
950954
"""
951955
Gets the current number of state events in a room.
952956
953957
Args:
954-
room_id (str)
958+
room_id: The room ID to query.
955959
956960
Returns:
957-
Deferred[int]
961+
The current number of state events.
958962
"""
959-
return self.db_pool.runInteraction(
963+
return await self.db_pool.runInteraction(
960964
"get_current_state_event_counts",
961965
self._get_current_state_event_counts_txn,
962966
room_id,
@@ -991,15 +995,17 @@ def get_current_events_token(self):
991995
"""The current maximum token that events have reached"""
992996
return self._stream_id_gen.get_current_token()
993997

994-
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
998+
async def get_all_new_forward_event_rows(
999+
self, last_id: int, current_id: int, limit: int
1000+
) -> List[Tuple]:
9951001
"""Returns new events, for the Events replication stream
9961002
9971003
Args:
9981004
last_id: the last stream_id from the previous batch.
9991005
current_id: the maximum stream_id to return up to
10001006
limit: the maximum number of rows to return
10011007
1002-
Returns: Deferred[List[Tuple]]
1008+
Returns:
10031009
a list of events stream rows. Each tuple consists of a stream id as
10041010
the first element, followed by fields suitable for casting into an
10051011
EventsStreamRow.
@@ -1020,18 +1026,20 @@ def get_all_new_forward_event_rows(txn):
10201026
txn.execute(sql, (last_id, current_id, limit))
10211027
return txn.fetchall()
10221028

1023-
return self.db_pool.runInteraction(
1029+
return await self.db_pool.runInteraction(
10241030
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
10251031
)
10261032

1027-
def get_ex_outlier_stream_rows(self, last_id, current_id):
1033+
async def get_ex_outlier_stream_rows(
1034+
self, last_id: int, current_id: int
1035+
) -> List[Tuple]:
10281036
"""Returns de-outliered events, for the Events replication stream
10291037
10301038
Args:
10311039
last_id: the last stream_id from the previous batch.
10321040
current_id: the maximum stream_id to return up to
10331041
1034-
Returns: Deferred[List[Tuple]]
1042+
Returns:
10351043
a list of events stream rows. Each tuple consists of a stream id as
10361044
the first element, followed by fields suitable for casting into an
10371045
EventsStreamRow.
@@ -1054,7 +1062,7 @@ def get_ex_outlier_stream_rows_txn(txn):
10541062
txn.execute(sql, (last_id, current_id))
10551063
return txn.fetchall()
10561064

1057-
return self.db_pool.runInteraction(
1065+
return await self.db_pool.runInteraction(
10581066
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
10591067
)
10601068

@@ -1226,11 +1234,11 @@ async def get_event_ordering(self, event_id):
12261234

12271235
return (int(res["topological_ordering"]), int(res["stream_ordering"]))
12281236

1229-
def get_next_event_to_expire(self):
1237+
async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]:
12301238
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
12311239
table, or None if there's no more event to expire.
12321240
1233-
Returns: Deferred[Optional[Tuple[str, int]]]
1241+
Returns:
12341242
A tuple containing the event ID as its first element and an expiry timestamp
12351243
as its second one, if there's at least one row in the event_expiry table.
12361244
None otherwise.
@@ -1246,6 +1254,6 @@ def get_next_event_to_expire_txn(txn):
12461254

12471255
return txn.fetchone()
12481256

1249-
return self.db_pool.runInteraction(
1257+
return await self.db_pool.runInteraction(
12501258
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
12511259
)

0 commit comments

Comments
 (0)