Skip to content

Commit

Permalink
Add assertion to MultiWriterTokens
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 12, 2024
1 parent bf09110 commit 697e3b5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
13 changes: 13 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
represented by a default `stream` attribute and a map of instance name to
stream position of any writers that are ahead of the default stream
position.
The values in `instance_map` must be greater than the `stream` attribute.
"""

stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
Expand All @@ -472,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
kw_only=True,
)

def __attrs_post_init__(self):
# Enforce that all instances have a value greater than the min stream
# position.
for v in self.instance_map.values():
if v < self.stream:
raise ValueError(
"'instance_map' includes a stream position before the main 'stream' attribute"
)

@classmethod
@abc.abstractmethod
async def parse(cls, store: "DataStore", string: str) -> "Self":
Expand Down Expand Up @@ -641,6 +652,8 @@ def __attrs_post_init__(self) -> None:
"Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
)

super().__attrs_post_init__()

@classmethod
async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
try:
Expand Down
17 changes: 5 additions & 12 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,9 @@ def test_instance_map(self) -> None:
parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, token)

@skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
def test_instance_map_behind(self) -> None:
"""Test for stream token with instance map, where instance map entries
are from before stream token."""
store = self.hs.get_datastores().main
def test_instance_map_assertion(self) -> None:
"""Test that we assert values in the instance map are greater than the
min stream position"""

token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4}))

string_token = self.get_success(token.to_string(store))
self.assertEqual(string_token, "s5")

parsed_token = self.get_success(RoomStreamToken.parse(store, string_token))
self.assertEqual(parsed_token, RoomStreamToken(stream=5))
with self.assertRaises(ValueError):
RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4}))

0 comments on commit 697e3b5

Please sign in to comment.