|
55 | 55 | from synapse.storage.engines import PostgresEngine
|
56 | 56 | from synapse.storage.types import Cursor
|
57 | 57 | from synapse.storage.util.sequence import (
|
58 |
| - PostgresSequenceGenerator, |
59 | 58 | build_sequence_generator,
|
60 | 59 | )
|
61 | 60 |
|
@@ -280,7 +279,7 @@ def __init__(
|
280 | 279 | self._max_position_of_local_instance = self._max_seen_allocated_stream_id
|
281 | 280 |
|
282 | 281 | # This goes and fills out the above state from the database.
|
283 |
| - self._load_current_ids(db_conn, tables) |
| 282 | + self._load_current_ids(db_conn, tables, sequence_name) |
284 | 283 |
|
285 | 284 | self._sequence_gen = build_sequence_generator(
|
286 | 285 | db_conn=db_conn,
|
@@ -330,6 +329,7 @@ def _load_current_ids(
|
330 | 329 | self,
|
331 | 330 | db_conn: LoggingDatabaseConnection,
|
332 | 331 | tables: List[Tuple[str, str, str]],
|
| 332 | + sequence_name: str, |
333 | 333 | ) -> None:
|
334 | 334 | cur = db_conn.cursor(txn_name="_load_current_ids")
|
335 | 335 |
|
@@ -369,9 +369,12 @@ def _load_current_ids(
|
369 | 369 | # date. If we're using Postgres for the sequences, we can just use the current
|
370 | 370 | # sequence value as our own position.
|
371 | 371 | if self._instance_name in self._writers:
|
372 |
| - if isinstance(self._sequence_gen, PostgresSequenceGenerator): |
| 372 | + if isinstance(self._db.engine, PostgresEngine): |
| 373 | + cur.execute(f"SELECT last_value FROM {sequence_name}") |
| 374 | + row = cur.fetchone() |
| 375 | + assert row is not None |
373 | 376 | self._current_positions[self._instance_name] = (
|
374 |
| - self._sequence_gen.current_sequence_value(cur) |
| 377 | + row[0] * self._return_factor |
375 | 378 | )
|
376 | 379 |
|
377 | 380 | # We set the `_persisted_upto_position` to be the minimum of all current
|
|
0 commit comments