Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e0fdbf6

Browse files
committed
Remove unused stream_positions() function
1 parent 4d4919c commit e0fdbf6

File tree

14 files changed

+2
-118
lines changed

14 files changed

+2
-118
lines changed

synapse/app/generic_worker.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,6 @@ def _reset(self):
394394
# map room IDs to sets of users currently typing
395395
self._room_typing = {}
396396

397-
def stream_positions(self):
398-
# We must update this typing token from the response of the previous
399-
# sync. In particular, the stream id may "reset" back to zero/a low
400-
# value which we *must* use for the next replication request.
401-
return {"typing": self._latest_room_serial}
402-
403397
def process_replication_rows(self, token, rows):
404398
if self._latest_room_serial > token:
405399
# The master has gone backwards. To prevent inconsistent data, just
@@ -637,13 +631,6 @@ async def on_rdata(self, stream_name, token, rows):
637631
)
638632
await self.process_and_notify(stream_name, token, rows)
639633

640-
def get_streams_to_replicate(self):
641-
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
642-
args.update(self.typing_handler.stream_positions())
643-
if self.send_handler:
644-
args.update(self.send_handler.stream_positions())
645-
return args
646-
647634
async def process_and_notify(self, stream_name, token, rows):
648635
try:
649636
if self.send_handler:
@@ -778,9 +765,6 @@ def on_start(self):
778765
def wake_destination(self, server: str):
779766
self.federation_sender.wake_destination(server)
780767

781-
def stream_positions(self):
782-
return {"federation": self.federation_position}
783-
784768
async def process_replication_rows(self, stream_name, token, rows):
785769
# The federation stream contains things that we want to send out, e.g.
786770
# presence, typing, etc.

synapse/replication/slave/storage/_base.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# limitations under the License.
1515

1616
import logging
17-
from typing import Dict, Optional
17+
from typing import Optional
1818

1919
import six
2020

@@ -49,19 +49,6 @@ def __init__(self, database: Database, db_conn, hs):
4949

5050
self.hs = hs
5151

52-
def stream_positions(self) -> Dict[str, int]:
53-
"""
54-
Get the current positions of all the streams this store wants to subscribe to
55-
56-
Returns:
57-
map from stream name to the most recent update we have for
58-
that stream (ie, the point we want to start replicating from)
59-
"""
60-
pos = {}
61-
if self._cache_id_gen:
62-
pos["caches"] = self._cache_id_gen.get_current_token()
63-
return pos
64-
6552
def get_cache_stream_token(self):
6653
if self._cache_id_gen:
6754
return self._cache_id_gen.get_current_token()

synapse/replication/slave/storage/account_data.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,6 @@ def __init__(self, database: Database, db_conn, hs):
3232
def get_max_account_data_stream_id(self):
3333
return self._account_data_id_gen.get_current_token()
3434

35-
def stream_positions(self):
36-
result = super(SlavedAccountDataStore, self).stream_positions()
37-
position = self._account_data_id_gen.get_current_token()
38-
result["user_account_data"] = position
39-
result["room_account_data"] = position
40-
result["tag_account_data"] = position
41-
return result
42-
4335
def process_replication_rows(self, stream_name, token, rows):
4436
if stream_name == "tag_account_data":
4537
self._account_data_id_gen.advance(token)

synapse/replication/slave/storage/deviceinbox.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ def __init__(self, database: Database, db_conn, hs):
4343
expiry_ms=30 * 60 * 1000,
4444
)
4545

46-
def stream_positions(self):
47-
result = super(SlavedDeviceInboxStore, self).stream_positions()
48-
result["to_device"] = self._device_inbox_id_gen.get_current_token()
49-
return result
50-
5146
def process_replication_rows(self, stream_name, token, rows):
5247
if stream_name == "to_device":
5348
self._device_inbox_id_gen.advance(token)

synapse/replication/slave/storage/devices.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,6 @@ def __init__(self, database: Database, db_conn, hs):
4848
"DeviceListFederationStreamChangeCache", device_list_max
4949
)
5050

51-
def stream_positions(self):
52-
result = super(SlavedDeviceStore, self).stream_positions()
53-
# The user signature stream uses the same stream ID generator as the
54-
# device list stream, so set them both to the device list ID
55-
# generator's current token.
56-
current_token = self._device_list_id_gen.get_current_token()
57-
result[DeviceListsStream.NAME] = current_token
58-
result[UserSignatureStream.NAME] = current_token
59-
return result
60-
6151
def process_replication_rows(self, stream_name, token, rows):
6252
if stream_name == DeviceListsStream.NAME:
6353
self._device_list_id_gen.advance(token)

synapse/replication/slave/storage/events.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,6 @@ def get_room_max_stream_ordering(self):
9393
def get_room_min_stream_ordering(self):
9494
return self._backfill_id_gen.get_current_token()
9595

96-
def stream_positions(self):
97-
result = super(SlavedEventStore, self).stream_positions()
98-
result["events"] = self._stream_id_gen.get_current_token()
99-
result["backfill"] = -self._backfill_id_gen.get_current_token()
100-
return result
101-
10296
def process_replication_rows(self, stream_name, token, rows):
10397
if stream_name == "events":
10498
self._stream_id_gen.advance(token)

synapse/replication/slave/storage/groups.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ def __init__(self, database: Database, db_conn, hs):
3737
def get_group_stream_token(self):
3838
return self._group_updates_id_gen.get_current_token()
3939

40-
def stream_positions(self):
41-
result = super(SlavedGroupServerStore, self).stream_positions()
42-
result["groups"] = self._group_updates_id_gen.get_current_token()
43-
return result
44-
4540
def process_replication_rows(self, stream_name, token, rows):
4641
if stream_name == "groups":
4742
self._group_updates_id_gen.advance(token)

synapse/replication/slave/storage/presence.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,6 @@ def __init__(self, database: Database, db_conn, hs):
4141
def get_current_presence_token(self):
4242
return self._presence_id_gen.get_current_token()
4343

44-
def stream_positions(self):
45-
result = super(SlavedPresenceStore, self).stream_positions()
46-
47-
if self.hs.config.use_presence:
48-
position = self._presence_id_gen.get_current_token()
49-
result["presence"] = position
50-
51-
return result
52-
5344
def process_replication_rows(self, stream_name, token, rows):
5445
if stream_name == "presence":
5546
self._presence_id_gen.advance(token)

synapse/replication/slave/storage/push_rule.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ def get_push_rules_stream_token(self):
3737
def get_max_push_rules_stream_id(self):
3838
return self._push_rules_stream_id_gen.get_current_token()
3939

40-
def stream_positions(self):
41-
result = super(SlavedPushRuleStore, self).stream_positions()
42-
result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
43-
return result
44-
4540
def process_replication_rows(self, stream_name, token, rows):
4641
if stream_name == "push_rules":
4742
self._push_rules_stream_id_gen.advance(token)

synapse/replication/slave/storage/pushers.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ def __init__(self, database: Database, db_conn, hs):
2828
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
2929
)
3030

31-
def stream_positions(self):
32-
result = super(SlavedPusherStore, self).stream_positions()
33-
result["pushers"] = self._pushers_id_gen.get_current_token()
34-
return result
35-
3631
def get_pushers_stream_token(self):
3732
return self._pushers_id_gen.get_current_token()
3833

0 commit comments

Comments
 (0)