Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 0122d9a

Browse files
committed
STUFF for current token
1 parent 673bb5f commit 0122d9a

File tree

9 files changed

+63
-22
lines changed

9 files changed

+63
-22
lines changed

synapse/handlers/presence.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2178,7 +2178,7 @@ def send_presence_to_destinations(
21782178

21792179
self._notifier.notify_replication()
21802180

2181-
def get_current_token(self, instance_name: str) -> int:
2181+
def get_current_token(self, instance_name: str, minimum: bool = False) -> int:
21822182
"""Get the current position of the stream.
21832183
21842184
On workers this returns the last stream ID received from replication.

synapse/replication/http/_base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
248248

249249
data[_STREAM_POSITION_KEY] = {
250250
"streams": {
251-
stream.NAME: stream.current_token(local_instance_name)
251+
stream.NAME: stream.current_token(local_instance_name, True)
252252
for stream in streams
253253
},
254254
"instance_name": local_instance_name,
@@ -443,7 +443,7 @@ async def _check_auth_and_handle(
443443

444444
if self.WAIT_FOR_STREAMS:
445445
response[_STREAM_POSITION_KEY] = {
446-
stream.NAME: stream.current_token(self._instance_name)
446+
stream.NAME: stream.current_token(self._instance_name, True)
447447
for stream in self._streams
448448
}
449449

synapse/replication/tcp/client.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,13 @@ async def wait_for_stream_position(
344344

345345
# We measure here to get in flight counts and average waiting time.
346346
with Measure(self._clock, "repl.wait_for_stream_position"):
347-
logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
347+
logger.info(
348+
"Waiting for repl stream %r to reach %s (%s) (current: %s)",
349+
stream_name,
350+
position,
351+
instance_name,
352+
current_position,
353+
)
348354
await make_deferred_yieldable(deferred)
349355
logger.info(
350356
"Finished waiting for repl stream %r to reach %s", stream_name, position

synapse/replication/tcp/handler.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,13 @@ async def on_rdata(
540540
rows: a list of Stream.ROW_TYPE objects as returned by
541541
Stream.parse_row.
542542
"""
543-
logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token)
543+
logger.debug(
544+
"%s: Received rdata %s (%s) -> %s",
545+
self._instance_name,
546+
stream_name,
547+
instance_name,
548+
token,
549+
)
544550
await self._replication_data_handler.on_rdata(
545551
stream_name, instance_name, token, rows
546552
)

synapse/replication/tcp/resource.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ async def _run_notifier_loop(self) -> None:
160160

161161
for stream in all_streams:
162162
if stream.last_token == stream.current_token(
163-
self._instance_name
163+
self._instance_name,
164+
minimum=stream.NAME == EventsStream.NAME,
164165
):
165166
continue
166167

synapse/replication/tcp/streams/_base.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828

2929
import attr
30+
from typing_extensions import Protocol
3031

3132
from synapse.api.constants import AccountDataTypes
3233
from synapse.replication.http.streams import ReplicationGetStreamUpdates
@@ -78,6 +79,11 @@
7879
UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]]
7980

8081

82+
class CurrentTokenFunction(Protocol):
83+
def __call__(self, instance_name: str, minimum: bool = False) -> Token:
84+
...
85+
86+
8187
class Stream:
8288
"""Base class for the streams.
8389
@@ -107,7 +113,7 @@ def parse_row(cls, row: StreamRow) -> Any:
107113
def __init__(
108114
self,
109115
local_instance_name: str,
110-
current_token_function: Callable[[str], Token],
116+
current_token_function: CurrentTokenFunction,
111117
update_function: UpdateFunction,
112118
):
113119
"""Instantiate a Stream
@@ -192,12 +198,16 @@ async def get_updates_since(
192198

193199
def current_token_without_instance(
194200
current_token: Callable[[], int]
195-
) -> Callable[[str], int]:
201+
) -> CurrentTokenFunction:
196202
"""Takes a current token callback function for a single writer stream
197203
that doesn't take an instance name parameter and wraps it in a function that
198204
does accept an instance name parameter but ignores it.
199205
"""
200-
return lambda instance_name: current_token()
206+
207+
def expanded_current_token(instance_name: str, minimum: bool = False) -> int:
208+
return current_token()
209+
210+
return expanded_current_token
201211

202212

203213
def make_http_update_function(hs: "HomeServer", stream_name: str) -> UpdateFunction:
@@ -246,10 +256,12 @@ def __init__(self, hs: "HomeServer"):
246256
self.store.get_all_new_backfill_event_rows,
247257
)
248258

249-
def _current_token(self, instance_name: str) -> int:
259+
def _current_token(self, instance_name: str, minimum: bool = False) -> int:
250260
# The backfill stream over replication operates on *positive* numbers,
251261
# which means we need to negate it.
252-
return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name)
262+
return -self.store._backfill_id_gen.get_current_token_for_writer(
263+
instance_name, minimum
264+
)
253265

254266

255267
class PresenceStream(Stream):
@@ -395,7 +407,7 @@ def __init__(self, hs: "HomeServer"):
395407
self.store.get_all_push_rule_updates,
396408
)
397409

398-
def _current_token(self, instance_name: str) -> int:
410+
def _current_token(self, instance_name: str, minimum: bool = False) -> int:
399411
push_rules_token = self.store.get_max_push_rules_stream_id()
400412
return push_rules_token
401413

synapse/replication/tcp/streams/federation.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def __init__(self, hs: "HomeServer"):
6868
super().__init__(hs.get_instance_name(), current_token, update_function)
6969

7070
@staticmethod
71-
def _stub_current_token(instance_name: str) -> int:
71+
def _stub_current_token(instance_name: str, minimum: bool = False) -> int:
7272
# dummy current-token method for use on workers
7373
return 0
7474

synapse/storage/databases/main/cache.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def process_replication_position(
184184
) -> None:
185185
if stream_name == CachesStream.NAME:
186186
if self._cache_id_gen:
187+
logger.info("Advancing cache for %s to %s", instance_name, token)
187188
self._cache_id_gen.advance(instance_name, token)
188189
super().process_replication_position(stream_name, instance_name, token)
189190

@@ -402,8 +403,12 @@ def _send_invalidation_to_replication(
402403
},
403404
)
404405

405-
def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
406+
def get_cache_stream_token_for_writer(
407+
self, instance_name: str, minimum: bool = False
408+
) -> int:
406409
if self._cache_id_gen:
407-
return self._cache_id_gen.get_current_token_for_writer(instance_name)
410+
return self._cache_id_gen.get_current_token_for_writer(
411+
instance_name, minimum
412+
)
408413
else:
409414
return 0

synapse/storage/util/id_generators.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,9 @@ def get_current_token(self) -> int:
119119
raise NotImplementedError()
120120

121121
@abc.abstractmethod
122-
def get_current_token_for_writer(self, instance_name: str) -> int:
122+
def get_current_token_for_writer(
123+
self, instance_name: str, minimum: bool = False
124+
) -> int:
123125
"""Returns the position of the given writer.
124126
125127
For streams with single writers this is equivalent to `get_current_token`.
@@ -262,7 +264,9 @@ def get_current_token(self) -> int:
262264

263265
return self._current
264266

265-
def get_current_token_for_writer(self, instance_name: str) -> int:
267+
def get_current_token_for_writer(
268+
self, instance_name: str, minimum: bool = False
269+
) -> int:
266270
return self.get_current_token()
267271

268272

@@ -378,6 +382,8 @@ def __init__(
378382
self._current_positions.values(), default=1
379383
)
380384

385+
self._last_persisted_position = self._persisted_upto_position
386+
381387
def _load_current_ids(
382388
self,
383389
db_conn: LoggingDatabaseConnection,
@@ -627,24 +633,29 @@ def _mark_id_as_finished(self, next_id: int) -> None:
627633
if new_cur:
628634
curr = self._current_positions.get(self._instance_name, 0)
629635
self._current_positions[self._instance_name] = max(curr, new_cur)
636+
self._last_persisted_position = max(curr, new_cur)
630637

631638
self._add_persisted_position(next_id)
632639

633640
def get_current_token(self) -> int:
634641
return self.get_persisted_upto_position()
635642

636-
def get_current_token_for_writer(self, instance_name: str) -> int:
643+
def get_current_token_for_writer(
644+
self, instance_name: str, minimum: bool = False
645+
) -> int:
637646
# If we don't have an entry for the given instance name, we assume it's a
638647
# new writer.
639648
#
640649
# For new writers we assume their initial position to be the current
641650
# persisted up to position. This stops Synapse from doing a full table
642651
# scan when a new writer announces itself over replication.
643652
with self._lock:
644-
return self._return_factor * max(
645-
self._current_positions.get(instance_name, 0),
646-
self._persisted_upto_position,
647-
)
653+
if minimum and instance_name == self._instance_name:
654+
return self._last_persisted_position
655+
else:
656+
return self._return_factor * self._current_positions.get(
657+
instance_name, self._persisted_upto_position
658+
)
648659

649660
def get_positions(self) -> Dict[str, int]:
650661
"""Get a copy of the current positon map.

0 commit comments

Comments
 (0)