Skip to content

Commit b905ae2

Browse files
authored
Fix regression when bounding future tokens (#17391)
Fix bug added in #17386, where we accidentally used `room_key` for the receipts stream. See first commit. Reviewable commit-by-commit
1 parent 1ce59d7 commit b905ae2

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

changelog.d/17391.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.

synapse/streams/events.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#
2020
#
2121

22+
import logging
2223
from typing import TYPE_CHECKING, Sequence, Tuple
2324

2425
import attr
@@ -41,6 +42,9 @@
4142
from synapse.server import HomeServer
4243

4344

45+
logger = logging.getLogger(__name__)
46+
47+
4448
@attr.s(frozen=True, slots=True, auto_attribs=True)
4549
class _EventSourcesInner:
4650
room: RoomEventSource
@@ -139,17 +143,31 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken:
139143
key
140144
].get_max_allocated_token()
141145

142-
token = token.copy_and_replace(
143-
key, token.room_key.bound_stream_token(max_token)
144-
)
146+
if max_token < token_value.get_max_stream_pos():
147+
logger.error(
148+
"Bounding token from the future '%s': token: %s, bound: %s",
149+
key,
150+
token_value,
151+
max_token,
152+
)
153+
token = token.copy_and_replace(
154+
key, token_value.bound_stream_token(max_token)
155+
)
145156
else:
146157
assert isinstance(current_value, int)
147158
if current_value < token_value:
148159
max_token = await stream_key_to_id_gen[
149160
key
150161
].get_max_allocated_token()
151162

152-
token = token.copy_and_replace(key, min(token_value, max_token))
163+
if max_token < token_value:
164+
logger.error(
165+
"Bounding token from the future '%s': token: %s, bound: %s",
166+
key,
167+
token_value,
168+
max_token,
169+
)
170+
token = token.copy_and_replace(key, max_token)
153171

154172
return token
155173

tests/handlers/test_sync.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,14 @@
3636
from synapse.rest import admin
3737
from synapse.rest.client import knock, login, room
3838
from synapse.server import HomeServer
39-
from synapse.types import JsonDict, StreamKeyType, UserID, create_requester
39+
from synapse.types import (
40+
JsonDict,
41+
MultiWriterStreamToken,
42+
RoomStreamToken,
43+
StreamKeyType,
44+
UserID,
45+
create_requester,
46+
)
4047
from synapse.util import Clock
4148

4249
import tests.unittest
@@ -999,7 +1006,13 @@ def test_wait_for_future_sync_token(self) -> None:
9991006

10001007
self.get_success(sync_d, by=1.0)
10011008

1002-
def test_wait_for_invalid_future_sync_token(self) -> None:
1009+
@parameterized.expand(
1010+
[(key,) for key in StreamKeyType.__members__.values()],
1011+
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
1012+
)
1013+
def test_wait_for_invalid_future_sync_token(
1014+
self, stream_key: StreamKeyType
1015+
) -> None:
10031016
"""Like the previous test, except we give a token that has a stream
10041017
position ahead of what is in the DB, i.e. its invalid and we shouldn't
10051018
wait for the stream to advance (as it may never do so).
@@ -1010,11 +1023,23 @@ def test_wait_for_invalid_future_sync_token(self) -> None:
10101023
"""
10111024
user = self.register_user("alice", "password")
10121025

1013-
# Create a token and arbitrarily advance one of the streams.
1026+
# Create a token and advance one of the streams.
10141027
current_token = self.hs.get_event_sources().get_current_token()
1015-
since_token = current_token.copy_and_advance(
1016-
StreamKeyType.PRESENCE, current_token.presence_key + 1
1017-
)
1028+
token_value = current_token.get_field(stream_key)
1029+
1030+
# How we advance the streams depends on the type.
1031+
if isinstance(token_value, int):
1032+
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
1033+
elif isinstance(token_value, MultiWriterStreamToken):
1034+
since_token = current_token.copy_and_advance(
1035+
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
1036+
)
1037+
elif isinstance(token_value, RoomStreamToken):
1038+
since_token = current_token.copy_and_advance(
1039+
stream_key, RoomStreamToken(stream=token_value.stream + 1)
1040+
)
1041+
else:
1042+
raise Exception("Unreachable")
10181043

10191044
sync_d = defer.ensureDeferred(
10201045
self.sync_handler.wait_for_sync_for_user(

0 commit comments

Comments
 (0)