Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 7d2532b

Browse files
authored
Discard RDATA from already seen positions. (#7648)
1 parent bd6dc17 commit 7d2532b

File tree

6 files changed

+175
-27
lines changed

6 files changed

+175
-27
lines changed

changelog.d/7648.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
In working mode, ensure that replicated data has not already been received.

synapse/app/generic_worker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,11 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
738738
except Exception:
739739
logger.exception("Error processing replication")
740740

741+
async def on_position(self, stream_name: str, instance_name: str, token: int):
742+
await super().on_position(stream_name, instance_name, token)
743+
# Also call on_rdata to ensure that stream positions are properly reset.
744+
await self.on_rdata(stream_name, instance_name, token, [])
745+
741746
def stop_pusher(self, user_id, app_id, pushkey):
742747
if not self.notify_pushers:
743748
return

synapse/replication/tcp/commands.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def get_logcontext_id(self):
149149

150150

151151
class PositionCommand(Command):
152-
"""Sent by the server to tell the client the stream postition without
152+
"""Sent by the server to tell the client the stream position without
153153
needing to send an RDATA.
154154
155155
Format::
@@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):
188188

189189

190190
class PingCommand(_SimpleCommand):
191-
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
191+
"""Sent by either side as a keep alive. The data is arbitrary (often timestamp)
192192
"""
193193

194194
NAME = "PING"

synapse/replication/tcp/handler.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ def __init__(self, hs):
112112
"replication_position", clock=self._clock
113113
)
114114

115-
# Map of stream to batched updates. See RdataCommand for info on how
116-
# batching works.
115+
# Map of stream name to batched updates. See RdataCommand for info on
116+
# how batching works.
117117
self._pending_batches = {} # type: Dict[str, List[Any]]
118118

119119
# The factory used to create connections.
@@ -123,7 +123,8 @@ def __init__(self, hs):
123123
# outgoing replication commands to.)
124124
self._connections = [] # type: List[AbstractConnection]
125125

126-
# For each connection, the incoming streams that are coming from that connection
126+
# For each connection, the incoming stream names that are coming from
127+
# that connection.
127128
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]
128129

129130
LaterGauge(
@@ -310,7 +311,28 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
310311
# Check if this is the last of a batch of updates
311312
rows = self._pending_batches.pop(stream_name, [])
312313
rows.append(row)
313-
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
314+
315+
stream = self._streams.get(stream_name)
316+
if not stream:
317+
logger.error("Got RDATA for unknown stream: %s", stream_name)
318+
return
319+
320+
# Find where we previously streamed up to.
321+
current_token = stream.current_token(cmd.instance_name)
322+
323+
# Discard this data if this token is earlier than the current
324+
# position. Note that streams can be reset (in which case you
325+
# expect an earlier token), but that must be preceded by a
326+
# POSITION command.
327+
if cmd.token <= current_token:
328+
logger.debug(
329+
"Discarding RDATA from stream %s at position %s before previous position %s",
330+
stream_name,
331+
cmd.token,
332+
current_token,
333+
)
334+
else:
335+
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
314336

315337
async def on_rdata(
316338
self, stream_name: str, instance_name: str, token: int, rows: list

tests/replication/tcp/streams/test_events.py

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from synapse.api.constants import EventTypes, Membership
1919
from synapse.events import EventBase
20+
from synapse.replication.tcp.commands import RdataCommand
2021
from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
2122
from synapse.replication.tcp.streams.events import (
2223
EventsStreamCurrentStateRow,
@@ -66,11 +67,6 @@ def test_update_function_event_row_limit(self):
6667
# also one state event
6768
state_event = self._inject_state_event()
6869

69-
# tell the notifier to catch up to avoid duplicate rows.
70-
# workaround for https://github.com/matrix-org/synapse/issues/7360
71-
# FIXME remove this when the above is fixed
72-
self.replicate()
73-
7470
# check we're testing what we think we are: no rows should yet have been
7571
# received
7672
self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -174,11 +170,6 @@ def test_update_function_huge_state_change(self):
174170
# one more bit of state that doesn't get rolled back
175171
state2 = self._inject_state_event()
176172

177-
# tell the notifier to catch up to avoid duplicate rows.
178-
# workaround for https://github.com/matrix-org/synapse/issues/7360
179-
# FIXME remove this when the above is fixed
180-
self.replicate()
181-
182173
# check we're testing what we think we are: no rows should yet have been
183174
# received
184175
self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -327,11 +318,6 @@ def test_update_function_state_row_limit(self):
327318
prev_events = [e.event_id]
328319
pl_events.append(e)
329320

330-
# tell the notifier to catch up to avoid duplicate rows.
331-
# workaround for https://github.com/matrix-org/synapse/issues/7360
332-
# FIXME remove this when the above is fixed
333-
self.replicate()
334-
335321
# check we're testing what we think we are: no rows should yet have been
336322
# received
337323
self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -378,6 +364,64 @@ def test_update_function_state_row_limit(self):
378364

379365
self.assertEqual([], received_rows)
380366

367+
def test_backwards_stream_id(self):
368+
"""
369+
Test that RDATA that comes after the current position should be discarded.
370+
"""
371+
# disconnect, so that we can stack up some changes
372+
self.disconnect()
373+
374+
# Generate an events. We inject them using inject_event so that they are
375+
# not send out over replication until we call self.replicate().
376+
event = self._inject_test_event()
377+
378+
# check we're testing what we think we are: no rows should yet have been
379+
# received
380+
self.assertEqual([], self.test_handler.received_rdata_rows)
381+
382+
# now reconnect to pull the updates
383+
self.reconnect()
384+
self.replicate()
385+
386+
# We should have received the expected single row (as well as various
387+
# cache invalidation updates which we ignore).
388+
received_rows = [
389+
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
390+
]
391+
392+
# There should be a single received row.
393+
self.assertEqual(len(received_rows), 1)
394+
395+
stream_name, token, row = received_rows[0]
396+
self.assertEqual("events", stream_name)
397+
self.assertIsInstance(row, EventsStreamRow)
398+
self.assertEqual(row.type, "ev")
399+
self.assertIsInstance(row.data, EventsStreamEventRow)
400+
self.assertEqual(row.data.event_id, event.event_id)
401+
402+
# Reset the data.
403+
self.test_handler.received_rdata_rows = []
404+
405+
# Save the current token for later.
406+
worker_events_stream = self.worker_hs.get_replication_streams()["events"]
407+
prev_token = worker_events_stream.current_token("master")
408+
409+
# Manually send an old RDATA command, which should get dropped. This
410+
# re-uses the row from above, but with an earlier stream token.
411+
self.hs.get_tcp_replication().send_command(
412+
RdataCommand("events", "master", 1, row)
413+
)
414+
415+
# No updates have been received (because it was discard as old).
416+
received_rows = [
417+
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
418+
]
419+
self.assertEqual(len(received_rows), 0)
420+
421+
# Ensure the stream has not gone backwards.
422+
current_token = worker_events_stream.current_token("master")
423+
self.assertGreaterEqual(current_token, prev_token)
424+
381425
event_count = 0
382426

383427
def _inject_test_event(

tests/replication/tcp/streams/test_typing.py

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@
1616

1717
from synapse.handlers.typing import RoomMember
1818
from synapse.replication.tcp.streams import TypingStream
19+
from synapse.util.caches.stream_change_cache import StreamChangeCache
1920

2021
from tests.replication._base import BaseStreamTestCase
2122

2223
USER_ID = "@feeling:blue"
24+
USER_ID_2 = "@da-ba-dee:blue"
25+
26+
ROOM_ID = "!bar:blue"
27+
ROOM_ID_2 = "!foo:blue"
2328

2429

2530
class TypingStreamTestCase(BaseStreamTestCase):
@@ -29,11 +34,9 @@ def _build_replication_data_handler(self):
2934
def test_typing(self):
3035
typing = self.hs.get_typing_handler()
3136

32-
room_id = "!bar:blue"
33-
3437
self.reconnect()
3538

36-
typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)
39+
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
3740

3841
self.reactor.advance(0)
3942

@@ -46,15 +49,15 @@ def test_typing(self):
4649
self.assertEqual(stream_name, "typing")
4750
self.assertEqual(1, len(rdata_rows))
4851
row = rdata_rows[0] # type: TypingStream.TypingStreamRow
49-
self.assertEqual(room_id, row.room_id)
52+
self.assertEqual(ROOM_ID, row.room_id)
5053
self.assertEqual([USER_ID], row.user_ids)
5154

5255
# Now let's disconnect and insert some data.
5356
self.disconnect()
5457

5558
self.test_handler.on_rdata.reset_mock()
5659

57-
typing._push_update(member=RoomMember(room_id, USER_ID), typing=False)
60+
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=False)
5861

5962
self.test_handler.on_rdata.assert_not_called()
6063

@@ -73,5 +76,78 @@ def test_typing(self):
7376
self.assertEqual(stream_name, "typing")
7477
self.assertEqual(1, len(rdata_rows))
7578
row = rdata_rows[0]
76-
self.assertEqual(room_id, row.room_id)
79+
self.assertEqual(ROOM_ID, row.room_id)
80+
self.assertEqual([], row.user_ids)
81+
82+
def test_reset(self):
83+
"""
84+
Test what happens when a typing stream resets.
85+
86+
This is emulated by jumping the stream ahead, then reconnecting (which
87+
sends the proper position and RDATA).
88+
"""
89+
typing = self.hs.get_typing_handler()
90+
91+
self.reconnect()
92+
93+
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
94+
95+
self.reactor.advance(0)
96+
97+
# We should now see an attempt to connect to the master
98+
request = self.handle_http_replication_attempt()
99+
self.assert_request_is_get_repl_stream_updates(request, "typing")
100+
101+
self.test_handler.on_rdata.assert_called_once()
102+
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
103+
self.assertEqual(stream_name, "typing")
104+
self.assertEqual(1, len(rdata_rows))
105+
row = rdata_rows[0] # type: TypingStream.TypingStreamRow
106+
self.assertEqual(ROOM_ID, row.room_id)
107+
self.assertEqual([USER_ID], row.user_ids)
108+
109+
# Push the stream forward a bunch so it can be reset.
110+
for i in range(100):
111+
typing._push_update(
112+
member=RoomMember(ROOM_ID, "@test%s:blue" % i), typing=True
113+
)
114+
self.reactor.advance(0)
115+
116+
# Disconnect.
117+
self.disconnect()
118+
119+
# Reset the typing handler
120+
self.hs.get_replication_streams()["typing"].last_token = 0
121+
self.hs.get_tcp_replication()._streams["typing"].last_token = 0
122+
typing._latest_room_serial = 0
123+
typing._typing_stream_change_cache = StreamChangeCache(
124+
"TypingStreamChangeCache", typing._latest_room_serial
125+
)
126+
typing._reset()
127+
128+
# Reconnect.
129+
self.reconnect()
130+
self.pump(0.1)
131+
132+
# We should now see an attempt to connect to the master
133+
request = self.handle_http_replication_attempt()
134+
self.assert_request_is_get_repl_stream_updates(request, "typing")
135+
136+
# Reset the test code.
137+
self.test_handler.on_rdata.reset_mock()
138+
self.test_handler.on_rdata.assert_not_called()
139+
140+
# Push additional data.
141+
typing._push_update(member=RoomMember(ROOM_ID_2, USER_ID_2), typing=False)
142+
self.reactor.advance(0)
143+
144+
self.test_handler.on_rdata.assert_called_once()
145+
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
146+
self.assertEqual(stream_name, "typing")
147+
self.assertEqual(1, len(rdata_rows))
148+
row = rdata_rows[0]
149+
self.assertEqual(ROOM_ID_2, row.room_id)
77150
self.assertEqual([], row.user_ids)
151+
152+
# The token should have been reset.
153+
self.assertEqual(token, 1)

0 commit comments

Comments
 (0)