4848 UserIpCommand ,
4949 UserSyncCommand ,
5050)
51- from synapse .replication .tcp .protocol import AbstractConnection
51+ from synapse .replication .tcp .protocol import IReplicationConnection
5252from synapse .replication .tcp .streams import (
5353 STREAMS_MAP ,
5454 AccountDataStream ,
8282
8383# the type of the entries in _command_queues_by_stream
8484_StreamCommandQueue = Deque [
85- Tuple [Union [RdataCommand , PositionCommand ], AbstractConnection ]
85+ Tuple [Union [RdataCommand , PositionCommand ], IReplicationConnection ]
8686]
8787
8888
@@ -174,7 +174,7 @@ def __init__(self, hs: "HomeServer"):
174174
175175 # The currently connected connections. (The list of places we need to send
176176 # outgoing replication commands to.)
177- self ._connections = [] # type: List[AbstractConnection ]
177+ self ._connections = [] # type: List[IReplicationConnection ]
178178
179179 LaterGauge (
180180 "synapse_replication_tcp_resource_total_connections" ,
@@ -197,7 +197,7 @@ def __init__(self, hs: "HomeServer"):
197197
198198 # For each connection, the incoming stream names that have received a POSITION
199199 # from that connection.
200- self ._streams_by_connection = {} # type: Dict[AbstractConnection , Set[str]]
200+ self ._streams_by_connection = {} # type: Dict[IReplicationConnection , Set[str]]
201201
202202 LaterGauge (
203203 "synapse_replication_tcp_command_queue" ,
@@ -220,7 +220,7 @@ def __init__(self, hs: "HomeServer"):
220220 self ._server_notices_sender = hs .get_server_notices_sender ()
221221
222222 def _add_command_to_stream_queue (
223- self , conn : AbstractConnection , cmd : Union [RdataCommand , PositionCommand ]
223+ self , conn : IReplicationConnection , cmd : Union [RdataCommand , PositionCommand ]
224224 ) -> None :
225225 """Queue the given received command for processing
226226
@@ -267,7 +267,7 @@ async def _unsafe_process_queue(self, stream_name: str):
267267 async def _process_command (
268268 self ,
269269 cmd : Union [PositionCommand , RdataCommand ],
270- conn : AbstractConnection ,
270+ conn : IReplicationConnection ,
271271 stream_name : str ,
272272 ) -> None :
273273 if isinstance (cmd , PositionCommand ):
@@ -321,10 +321,10 @@ def get_streams_to_replicate(self) -> List[Stream]:
321321 """Get a list of streams that this instances replicates."""
322322 return self ._streams_to_replicate
323323
324- def on_REPLICATE (self , conn : AbstractConnection , cmd : ReplicateCommand ):
324+ def on_REPLICATE (self , conn : IReplicationConnection , cmd : ReplicateCommand ):
325325 self .send_positions_to_connection (conn )
326326
327- def send_positions_to_connection (self , conn : AbstractConnection ):
327+ def send_positions_to_connection (self , conn : IReplicationConnection ):
328328 """Send current position of all streams this process is source of to
329329 the connection.
330330 """
@@ -347,7 +347,7 @@ def send_positions_to_connection(self, conn: AbstractConnection):
347347 )
348348
349349 def on_USER_SYNC (
350- self , conn : AbstractConnection , cmd : UserSyncCommand
350+ self , conn : IReplicationConnection , cmd : UserSyncCommand
351351 ) -> Optional [Awaitable [None ]]:
352352 user_sync_counter .inc ()
353353
@@ -359,21 +359,23 @@ def on_USER_SYNC(
359359 return None
360360
361361 def on_CLEAR_USER_SYNC (
362- self , conn : AbstractConnection , cmd : ClearUserSyncsCommand
362+ self , conn : IReplicationConnection , cmd : ClearUserSyncsCommand
363363 ) -> Optional [Awaitable [None ]]:
364364 if self ._is_master :
365365 return self ._presence_handler .update_external_syncs_clear (cmd .instance_id )
366366 else :
367367 return None
368368
369- def on_FEDERATION_ACK (self , conn : AbstractConnection , cmd : FederationAckCommand ):
369+ def on_FEDERATION_ACK (
370+ self , conn : IReplicationConnection , cmd : FederationAckCommand
371+ ):
370372 federation_ack_counter .inc ()
371373
372374 if self ._federation_sender :
373375 self ._federation_sender .federation_ack (cmd .instance_name , cmd .token )
374376
375377 def on_USER_IP (
376- self , conn : AbstractConnection , cmd : UserIpCommand
378+ self , conn : IReplicationConnection , cmd : UserIpCommand
377379 ) -> Optional [Awaitable [None ]]:
378380 user_ip_cache_counter .inc ()
379381
@@ -395,7 +397,7 @@ async def _handle_user_ip(self, cmd: UserIpCommand):
395397 assert self ._server_notices_sender is not None
396398 await self ._server_notices_sender .on_user_ip (cmd .user_id )
397399
398- def on_RDATA (self , conn : AbstractConnection , cmd : RdataCommand ):
400+ def on_RDATA (self , conn : IReplicationConnection , cmd : RdataCommand ):
399401 if cmd .instance_name == self ._instance_name :
400402 # Ignore RDATA that are just our own echoes
401403 return
@@ -412,7 +414,7 @@ def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
412414 self ._add_command_to_stream_queue (conn , cmd )
413415
414416 async def _process_rdata (
415- self , stream_name : str , conn : AbstractConnection , cmd : RdataCommand
417+ self , stream_name : str , conn : IReplicationConnection , cmd : RdataCommand
416418 ) -> None :
417419 """Process an RDATA command
418420
@@ -486,7 +488,7 @@ async def on_rdata(
486488 stream_name , instance_name , token , rows
487489 )
488490
489- def on_POSITION (self , conn : AbstractConnection , cmd : PositionCommand ):
491+ def on_POSITION (self , conn : IReplicationConnection , cmd : PositionCommand ):
490492 if cmd .instance_name == self ._instance_name :
491493 # Ignore POSITION that are just our own echoes
492494 return
@@ -496,7 +498,7 @@ def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
496498 self ._add_command_to_stream_queue (conn , cmd )
497499
498500 async def _process_position (
499- self , stream_name : str , conn : AbstractConnection , cmd : PositionCommand
501+ self , stream_name : str , conn : IReplicationConnection , cmd : PositionCommand
500502 ) -> None :
501503 """Process a POSITION command
502504
@@ -553,7 +555,9 @@ async def _process_position(
553555
554556 self ._streams_by_connection .setdefault (conn , set ()).add (stream_name )
555557
556- def on_REMOTE_SERVER_UP (self , conn : AbstractConnection , cmd : RemoteServerUpCommand ):
558+ def on_REMOTE_SERVER_UP (
559+ self , conn : IReplicationConnection , cmd : RemoteServerUpCommand
560+ ):
557561 """"Called when get a new REMOTE_SERVER_UP command."""
558562 self ._replication_data_handler .on_remote_server_up (cmd .data )
559563
@@ -576,7 +580,7 @@ def on_REMOTE_SERVER_UP(self, conn: AbstractConnection, cmd: RemoteServerUpComma
576580 # between two instances, but that is not currently supported).
577581 self .send_command (cmd , ignore_conn = conn )
578582
579- def new_connection (self , connection : AbstractConnection ):
583+ def new_connection (self , connection : IReplicationConnection ):
580584 """Called when we have a new connection."""
581585 self ._connections .append (connection )
582586
@@ -603,7 +607,7 @@ def new_connection(self, connection: AbstractConnection):
603607 UserSyncCommand (self ._instance_id , user_id , True , now )
604608 )
605609
606- def lost_connection (self , connection : AbstractConnection ):
610+ def lost_connection (self , connection : IReplicationConnection ):
607611 """Called when a connection is closed/lost."""
608612 # we no longer need _streams_by_connection for this connection.
609613 streams = self ._streams_by_connection .pop (connection , None )
@@ -624,7 +628,7 @@ def connected(self) -> bool:
624628 return bool (self ._connections )
625629
626630 def send_command (
627- self , cmd : Command , ignore_conn : Optional [AbstractConnection ] = None
631+ self , cmd : Command , ignore_conn : Optional [IReplicationConnection ] = None
628632 ):
629633 """Send a command to all connected connections.
630634
0 commit comments