|
42 | 42 | from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore |
43 | 43 | from synapse.storage.engines._base import IsolationLevel |
44 | 44 | from synapse.storage.types import Connection |
45 | | -from synapse.storage.util.id_generators import ( |
46 | | - AbstractStreamIdGenerator, |
47 | | - MultiWriterIdGenerator, |
48 | | -) |
| 45 | +from synapse.storage.util.id_generators import MultiWriterIdGenerator |
49 | 46 | from synapse.util.caches.descriptors import cached, cachedList |
50 | 47 | from synapse.util.caches.stream_change_cache import StreamChangeCache |
51 | 48 | from synapse.util.iterutils import batch_iter |
@@ -83,7 +80,7 @@ def __init__( |
83 | 80 | super().__init__(database, db_conn, hs) |
84 | 81 |
|
85 | 82 | self._instance_name = hs.get_instance_name() |
86 | | - self._presence_id_gen: AbstractStreamIdGenerator |
| 83 | + self._presence_id_gen: MultiWriterIdGenerator |
87 | 84 |
|
88 | 85 | self._can_persist_presence = ( |
89 | 86 | self._instance_name in hs.config.worker.writers.presence |
@@ -455,6 +452,9 @@ async def get_presence_for_all_users( |
455 | 452 | def get_current_presence_token(self) -> int: |
456 | 453 | return self._presence_id_gen.get_current_token() |
457 | 454 |
|
| 455 | + def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator: |
| 456 | + return self._presence_id_gen |
| 457 | + |
458 | 458 | def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]: |
459 | 459 | """Fetch non-offline presence from the database so that we can register |
460 | 460 | the appropriate time outs. |
|
0 commit comments