Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e15b1fd

Browse files
committed
Merge pull request #6280 from matrix-org/erikj/receipts_async_await
* commit '770d1ef67': Newsfile Port receipt and read markers to async/wait
2 parents d57a35b + 770d1ef commit e15b1fd

File tree

7 files changed

+33
-53
lines changed

7 files changed

+33
-53
lines changed

changelog.d/6280.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Port receipt and read markers to async/wait.

synapse/federation/send_queue.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636

3737
from sortedcontainers import SortedDict
3838

39+
from twisted.internet import defer
40+
3941
from synapse.metrics import LaterGauge
4042
from synapse.storage.presence import UserPresenceState
4143
from synapse.util.metrics import Measure
@@ -212,7 +214,7 @@ def send_read_receipt(self, receipt):
212214
receipt (synapse.types.ReadReceipt):
213215
"""
214216
# nothing to do here: the replication listener will handle it.
215-
pass
217+
return defer.succeed(None)
216218

217219
def send_presence(self, states):
218220
"""As per FederationSender

synapse/handlers/read_marker.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import logging
1717

18-
from twisted.internet import defer
19-
2018
from synapse.util.async_helpers import Linearizer
2119

2220
from ._base import BaseHandler
@@ -32,31 +30,30 @@ def __init__(self, hs):
3230
self.read_marker_linearizer = Linearizer(name="read_marker")
3331
self.notifier = hs.get_notifier()
3432

35-
@defer.inlineCallbacks
36-
def received_client_read_marker(self, room_id, user_id, event_id):
33+
async def received_client_read_marker(self, room_id, user_id, event_id):
3734
"""Updates the read marker for a given user in a given room if the event ID given
3835
is ahead in the stream relative to the current read marker.
3936
4037
This uses a notifier to indicate that account data should be sent down /sync if
4138
the read marker has changed.
4239
"""
4340

44-
with (yield self.read_marker_linearizer.queue((room_id, user_id))):
45-
existing_read_marker = yield self.store.get_account_data_for_room_and_type(
41+
with await self.read_marker_linearizer.queue((room_id, user_id)):
42+
existing_read_marker = await self.store.get_account_data_for_room_and_type(
4643
user_id, room_id, "m.fully_read"
4744
)
4845

4946
should_update = True
5047

5148
if existing_read_marker:
5249
# Only update if the new marker is ahead in the stream
53-
should_update = yield self.store.is_event_after(
50+
should_update = await self.store.is_event_after(
5451
event_id, existing_read_marker["event_id"]
5552
)
5653

5754
if should_update:
5855
content = {"event_id": event_id}
59-
max_id = yield self.store.add_account_data_to_room(
56+
max_id = await self.store.add_account_data_to_room(
6057
user_id, room_id, "m.fully_read", content
6158
)
6259
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])

synapse/handlers/receipts.py

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from synapse.handlers._base import BaseHandler
2020
from synapse.types import ReadReceipt, get_domain_from_id
21+
from synapse.util.async_helpers import maybe_awaitable
2122

2223
logger = logging.getLogger(__name__)
2324

@@ -36,8 +37,7 @@ def __init__(self, hs):
3637
self.clock = self.hs.get_clock()
3738
self.state = hs.get_state_handler()
3839

39-
@defer.inlineCallbacks
40-
def _received_remote_receipt(self, origin, content):
40+
async def _received_remote_receipt(self, origin, content):
4141
"""Called when we receive an EDU of type m.receipt from a remote HS.
4242
"""
4343
receipts = []
@@ -62,17 +62,16 @@ def _received_remote_receipt(self, origin, content):
6262
)
6363
)
6464

65-
yield self._handle_new_receipts(receipts)
65+
await self._handle_new_receipts(receipts)
6666

67-
@defer.inlineCallbacks
68-
def _handle_new_receipts(self, receipts):
67+
async def _handle_new_receipts(self, receipts):
6968
"""Takes a list of receipts, stores them and informs the notifier.
7069
"""
7170
min_batch_id = None
7271
max_batch_id = None
7372

7473
for receipt in receipts:
75-
res = yield self.store.insert_receipt(
74+
res = await self.store.insert_receipt(
7675
receipt.room_id,
7776
receipt.receipt_type,
7877
receipt.user_id,
@@ -99,14 +98,15 @@ def _handle_new_receipts(self, receipts):
9998

10099
self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
101100
# Note that the min here shouldn't be relied upon to be accurate.
102-
yield self.hs.get_pusherpool().on_new_receipts(
103-
min_batch_id, max_batch_id, affected_room_ids
101+
await maybe_awaitable(
102+
self.hs.get_pusherpool().on_new_receipts(
103+
min_batch_id, max_batch_id, affected_room_ids
104+
)
104105
)
105106

106107
return True
107108

108-
@defer.inlineCallbacks
109-
def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
109+
async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
110110
"""Called when a client tells us a local user has read up to the given
111111
event_id in the room.
112112
"""
@@ -118,24 +118,11 @@ def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
118118
data={"ts": int(self.clock.time_msec())},
119119
)
120120

121-
is_new = yield self._handle_new_receipts([receipt])
121+
is_new = await self._handle_new_receipts([receipt])
122122
if not is_new:
123123
return
124124

125-
yield self.federation.send_read_receipt(receipt)
126-
127-
@defer.inlineCallbacks
128-
def get_receipts_for_room(self, room_id, to_key):
129-
"""Gets all receipts for a room, upto the given key.
130-
"""
131-
result = yield self.store.get_linearized_receipts_for_room(
132-
room_id, to_key=to_key
133-
)
134-
135-
if not result:
136-
return []
137-
138-
return result
125+
await self.federation.send_read_receipt(receipt)
139126

140127

141128
class ReceiptEventSource(object):

synapse/rest/client/v2_alpha/read_marker.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import logging
1717

18-
from twisted.internet import defer
19-
2018
from synapse.http.servlet import RestServlet, parse_json_object_from_request
2119

2220
from ._base import client_patterns
@@ -34,17 +32,16 @@ def __init__(self, hs):
3432
self.read_marker_handler = hs.get_read_marker_handler()
3533
self.presence_handler = hs.get_presence_handler()
3634

37-
@defer.inlineCallbacks
38-
def on_POST(self, request, room_id):
39-
requester = yield self.auth.get_user_by_req(request)
35+
async def on_POST(self, request, room_id):
36+
requester = await self.auth.get_user_by_req(request)
4037

41-
yield self.presence_handler.bump_presence_active_time(requester.user)
38+
await self.presence_handler.bump_presence_active_time(requester.user)
4239

4340
body = parse_json_object_from_request(request)
4441

4542
read_event_id = body.get("m.read", None)
4643
if read_event_id:
47-
yield self.receipts_handler.received_client_receipt(
44+
await self.receipts_handler.received_client_receipt(
4845
room_id,
4946
"m.read",
5047
user_id=requester.user.to_string(),
@@ -53,7 +50,7 @@ def on_POST(self, request, room_id):
5350

5451
read_marker_event_id = body.get("m.fully_read", None)
5552
if read_marker_event_id:
56-
yield self.read_marker_handler.received_client_read_marker(
53+
await self.read_marker_handler.received_client_read_marker(
5754
room_id,
5855
user_id=requester.user.to_string(),
5956
event_id=read_marker_event_id,

synapse/rest/client/v2_alpha/receipts.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import logging
1717

18-
from twisted.internet import defer
19-
2018
from synapse.api.errors import SynapseError
2119
from synapse.http.servlet import RestServlet
2220

@@ -39,16 +37,15 @@ def __init__(self, hs):
3937
self.receipts_handler = hs.get_receipts_handler()
4038
self.presence_handler = hs.get_presence_handler()
4139

42-
@defer.inlineCallbacks
43-
def on_POST(self, request, room_id, receipt_type, event_id):
44-
requester = yield self.auth.get_user_by_req(request)
40+
async def on_POST(self, request, room_id, receipt_type, event_id):
41+
requester = await self.auth.get_user_by_req(request)
4542

4643
if receipt_type != "m.read":
4744
raise SynapseError(400, "Receipt type must be 'm.read'")
4845

49-
yield self.presence_handler.bump_presence_active_time(requester.user)
46+
await self.presence_handler.bump_presence_active_time(requester.user)
5047

51-
yield self.receipts_handler.received_client_receipt(
48+
await self.receipts_handler.received_client_receipt(
5249
room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id
5350
)
5451

synapse/storage/data_stores/main/events.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,12 +2442,11 @@ def _purge_room_txn(self, txn, room_id):
24422442

24432443
logger.info("[purge] done")
24442444

2445-
@defer.inlineCallbacks
2446-
def is_event_after(self, event_id1, event_id2):
2445+
async def is_event_after(self, event_id1, event_id2):
24472446
"""Returns True if event_id1 is after event_id2 in the stream
24482447
"""
2449-
to_1, so_1 = yield self._get_event_ordering(event_id1)
2450-
to_2, so_2 = yield self._get_event_ordering(event_id2)
2448+
to_1, so_1 = await self._get_event_ordering(event_id1)
2449+
to_2, so_2 = await self._get_event_ordering(event_id2)
24512450
return (to_1, so_1) > (to_2, so_2)
24522451

24532452
@cachedInlineCallbacks(max_entries=5000)

0 commit comments

Comments
 (0)