Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a086b46

Browse files
committed
Remove unused stream_positions() function
1 parent f8614f2 commit a086b46

File tree

17 files changed

+1
-124
lines changed

17 files changed

+1
-124
lines changed

synapse/app/admin_cmd.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,6 @@ class AdminCmdReplicationHandler(ReplicationClientHandler):
8787
async def on_rdata(self, stream_name, token, rows):
8888
pass
8989

90-
def get_streams_to_replicate(self):
91-
return {}
92-
9390

9491
@defer.inlineCallbacks
9592
def export_data_command(hs, args):

synapse/app/generic_worker.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -589,13 +589,6 @@ async def on_rdata(self, stream_name, token, rows):
589589
)
590590
run_in_background(self.process_and_notify, stream_name, token, rows)
591591

592-
def get_streams_to_replicate(self):
593-
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
594-
args.update(self.typing_handler.stream_positions())
595-
if self.send_handler:
596-
args.update(self.send_handler.stream_positions())
597-
return args
598-
599592
async def process_and_notify(self, stream_name, token, rows):
600593
try:
601594
if self.send_handler:

synapse/federation/sender/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -525,9 +525,6 @@ async def get_replication_rows(
525525
# to a worker.
526526
return []
527527

528-
def stream_positions(self):
529-
return {"federation": self.federation_position}
530-
531528
def process_replication_rows(self, stream_name, token, rows):
532529
# The federation stream contains things that we want to send out, e.g.
533530
# presence, typing, etc.

synapse/handlers/typing.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,6 @@ def _reset(self):
302302
# map room IDs to sets of users currently typing
303303
self._room_typing = {}
304304

305-
def stream_positions(self):
306-
# We must update this typing token from the response of the previous
307-
# sync. In particular, the stream id may "reset" back to zero/a low
308-
# value which we *must* use for the next replication request.
309-
return {"typing": self._latest_room_serial}
310-
311305
def process_replication_rows(self, token, rows):
312306
if self._latest_room_serial > token:
313307
# The master has gone backwards. To prevent inconsistent data, just

synapse/replication/slave/storage/_base.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# limitations under the License.
1515

1616
import logging
17-
from typing import Dict, Optional
17+
from typing import Optional
1818

1919
import six
2020

@@ -49,19 +49,6 @@ def __init__(self, database: Database, db_conn, hs):
4949

5050
self.hs = hs
5151

52-
def stream_positions(self) -> Dict[str, int]:
53-
"""
54-
Get the current positions of all the streams this store wants to subscribe to
55-
56-
Returns:
57-
map from stream name to the most recent update we have for
58-
that stream (ie, the point we want to start replicating from)
59-
"""
60-
pos = {}
61-
if self._cache_id_gen:
62-
pos["caches"] = self._cache_id_gen.get_current_token()
63-
return pos
64-
6552
def get_cache_stream_token(self):
6653
if self._cache_id_gen:
6754
return self._cache_id_gen.get_current_token()

synapse/replication/slave/storage/account_data.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,6 @@ def __init__(self, database: Database, db_conn, hs):
3232
def get_max_account_data_stream_id(self):
3333
return self._account_data_id_gen.get_current_token()
3434

35-
def stream_positions(self):
36-
result = super(SlavedAccountDataStore, self).stream_positions()
37-
position = self._account_data_id_gen.get_current_token()
38-
result["user_account_data"] = position
39-
result["room_account_data"] = position
40-
result["tag_account_data"] = position
41-
return result
42-
4335
def process_replication_rows(self, stream_name, token, rows):
4436
if stream_name == "tag_account_data":
4537
self._account_data_id_gen.advance(token)

synapse/replication/slave/storage/deviceinbox.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ def __init__(self, database: Database, db_conn, hs):
4343
expiry_ms=30 * 60 * 1000,
4444
)
4545

46-
def stream_positions(self):
47-
result = super(SlavedDeviceInboxStore, self).stream_positions()
48-
result["to_device"] = self._device_inbox_id_gen.get_current_token()
49-
return result
50-
5146
def process_replication_rows(self, stream_name, token, rows):
5247
if stream_name == "to_device":
5348
self._device_inbox_id_gen.advance(token)

synapse/replication/slave/storage/devices.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,6 @@ def __init__(self, database: Database, db_conn, hs):
4848
"DeviceListFederationStreamChangeCache", device_list_max
4949
)
5050

51-
def stream_positions(self):
52-
result = super(SlavedDeviceStore, self).stream_positions()
53-
# The user signature stream uses the same stream ID generator as the
54-
# device list stream, so set them both to the device list ID
55-
# generator's current token.
56-
current_token = self._device_list_id_gen.get_current_token()
57-
result[DeviceListsStream.NAME] = current_token
58-
result[UserSignatureStream.NAME] = current_token
59-
return result
60-
6151
def process_replication_rows(self, stream_name, token, rows):
6252
if stream_name == DeviceListsStream.NAME:
6353
self._device_list_id_gen.advance(token)

synapse/replication/slave/storage/events.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,6 @@ def get_room_max_stream_ordering(self):
9393
def get_room_min_stream_ordering(self):
9494
return self._backfill_id_gen.get_current_token()
9595

96-
def stream_positions(self):
97-
result = super(SlavedEventStore, self).stream_positions()
98-
result["events"] = self._stream_id_gen.get_current_token()
99-
result["backfill"] = -self._backfill_id_gen.get_current_token()
100-
return result
101-
10296
def process_replication_rows(self, stream_name, token, rows):
10397
if stream_name == "events":
10498
self._stream_id_gen.advance(token)

synapse/replication/slave/storage/groups.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ def __init__(self, database: Database, db_conn, hs):
3737
def get_group_stream_token(self):
3838
return self._group_updates_id_gen.get_current_token()
3939

40-
def stream_positions(self):
41-
result = super(SlavedGroupServerStore, self).stream_positions()
42-
result["groups"] = self._group_updates_id_gen.get_current_token()
43-
return result
44-
4540
def process_replication_rows(self, stream_name, token, rows):
4641
if stream_name == "groups":
4742
self._group_updates_id_gen.advance(token)

0 commit comments

Comments
 (0)