Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 316590d

Browse files
authored
Fix bug in wait_for_stream_position (#14856)
We were incorrectly checking if the *local* token had been advanced, rather than the token for the remote instance. In practice, I don't think this has caused any bugs due to where we use `wait_for_stream_position`, as critically we don't use it on instances that also write to the given streams (and so the local token will lag behind all remote tokens).
1 parent 2b084c5 commit 316590d

File tree

3 files changed

+80
-1
lines changed

3 files changed

+80
-1
lines changed

changelog.d/14856.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `wait_for_stream_position` to correctly wait for the right instance to advance its token.

synapse/replication/tcp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ async def wait_for_stream_position(
325325
# anyway in that case we don't need to wait.
326326
return
327327

328-
current_position = self._streams[stream_name].current_token(self._instance_name)
328+
current_position = self._streams[stream_name].current_token(instance_name)
329329
if position <= current_position:
330330
# We're already past the position
331331
return

tests/replication/tcp/test_handler.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from twisted.internet import defer
16+
17+
from synapse.replication.tcp.commands import PositionCommand, RdataCommand
18+
1519
from tests.replication._base import BaseMultiWorkerStreamTestCase
1620

1721

@@ -71,3 +75,77 @@ def test_non_background_worker_not_subscribed_to_user_ip(self) -> None:
7175
self.assertEqual(
7276
len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1
7377
)
78+
79+
def test_wait_for_stream_position(self) -> None:
80+
"""Check that wait for stream position correctly waits for an update from the
81+
correct instance.
82+
"""
83+
store = self.hs.get_datastores().main
84+
cmd_handler = self.hs.get_replication_command_handler()
85+
data_handler = self.hs.get_replication_data_handler()
86+
87+
worker1 = self.make_worker_hs(
88+
"synapse.app.generic_worker",
89+
extra_config={
90+
"worker_name": "worker1",
91+
"run_background_tasks_on": "worker1",
92+
"redis": {"enabled": True},
93+
},
94+
)
95+
96+
cache_id_gen = worker1.get_datastores().main._cache_id_gen
97+
assert cache_id_gen is not None
98+
99+
self.replicate()
100+
101+
# First, make sure the master knows that `worker1` exists.
102+
initial_token = cache_id_gen.get_current_token()
103+
cmd_handler.send_command(
104+
PositionCommand("caches", "worker1", initial_token, initial_token)
105+
)
106+
self.replicate()
107+
108+
# Next send out a normal RDATA, and check that waiting for that stream
109+
# ID returns immediately.
110+
ctx = cache_id_gen.get_next()
111+
next_token = self.get_success(ctx.__aenter__())
112+
self.get_success(ctx.__aexit__(None, None, None))
113+
114+
cmd_handler.send_command(
115+
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
116+
)
117+
self.replicate()
118+
119+
self.get_success(
120+
data_handler.wait_for_stream_position("worker1", "caches", next_token)
121+
)
122+
123+
# `wait_for_stream_position` should only return once master receives an
124+
# RDATA from the worker
125+
ctx = cache_id_gen.get_next()
126+
next_token = self.get_success(ctx.__aenter__())
127+
self.get_success(ctx.__aexit__(None, None, None))
128+
129+
d = defer.ensureDeferred(
130+
data_handler.wait_for_stream_position("worker1", "caches", next_token)
131+
)
132+
self.assertFalse(d.called)
133+
134+
# ... updating the cache ID gen on the master still shouldn't cause the
135+
# deferred to wake up.
136+
ctx = store._cache_id_gen.get_next()
137+
self.get_success(ctx.__aenter__())
138+
self.get_success(ctx.__aexit__(None, None, None))
139+
140+
d = defer.ensureDeferred(
141+
data_handler.wait_for_stream_position("worker1", "caches", next_token)
142+
)
143+
self.assertFalse(d.called)
144+
145+
# ... but receiving the RDATA should
146+
cmd_handler.send_command(
147+
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
148+
)
149+
self.replicate()
150+
151+
self.assertTrue(d.called)

0 commit comments

Comments
 (0)