1616"""
1717
1818import logging
19- from typing import Dict , List , Optional
2019
21- from twisted .internet import defer
2220from twisted .internet .protocol import ReconnectingClientFactory
2321
24- from synapse .replication .slave .storage ._base import BaseSlavedStore
25- from synapse .replication .tcp .protocol import (
26- AbstractReplicationClientHandler ,
27- ClientReplicationStreamProtocol ,
28- )
29-
30- from .commands import (
31- Command ,
32- FederationAckCommand ,
33- InvalidateCacheCommand ,
34- RemoteServerUpCommand ,
35- RemovePusherCommand ,
36- UserIpCommand ,
37- UserSyncCommand ,
38- )
22+ from synapse .replication .tcp .protocol import ClientReplicationStreamProtocol
3923
4024logger = logging .getLogger (__name__ )
4125
@@ -51,9 +35,9 @@ class ReplicationClientFactory(ReconnectingClientFactory):
5135 initialDelay = 0.1
5236 maxDelay = 1 # Try at least once every N seconds
5337
54- def __init__ (self , hs , client_name , handler : AbstractReplicationClientHandler ):
38+ def __init__ (self , hs , client_name ):
5539 self .client_name = client_name
56- self .handler = handler
40+ self .handler = hs . get_tcp_replication ()
5741 self .server_name = hs .config .server_name
5842 self .hs = hs
5943 self ._clock = hs .get_clock () # As self.clock is defined in super class
@@ -76,172 +60,3 @@ def clientConnectionLost(self, connector, reason):
7660 def clientConnectionFailed (self , connector , reason ):
7761 logger .error ("Failed to connect to replication: %r" , reason )
7862 ReconnectingClientFactory .clientConnectionFailed (self , connector , reason )
79-
80-
81- class ReplicationClientHandler (AbstractReplicationClientHandler ):
82- """A base handler that can be passed to the ReplicationClientFactory.
83-
84- By default proxies incoming replication data to the SlaveStore.
85- """
86-
87- def __init__ (self , store : BaseSlavedStore ):
88- self .store = store
89-
90- # The current connection. None if we are currently (re)connecting
91- self .connection = None
92-
93- # Any pending commands to be sent once a new connection has been
94- # established
95- self .pending_commands = [] # type: List[Command]
96-
97- # Map from string -> deferred, to wake up when receiveing a SYNC with
98- # the given string.
99- # Used for tests.
100- self .awaiting_syncs = {} # type: Dict[str, defer.Deferred]
101-
102- # The factory used to create connections.
103- self .factory = None # type: Optional[ReplicationClientFactory]
104-
105- def start_replication (self , hs ):
106- """Helper method to start a replication connection to the remote server
107- using TCP.
108- """
109- client_name = hs .config .worker_name
110- self .factory = ReplicationClientFactory (hs , client_name , self )
111- host = hs .config .worker_replication_host
112- port = hs .config .worker_replication_port
113- hs .get_reactor ().connectTCP (host , port , self .factory )
114-
115- async def on_rdata (self , stream_name , token , rows ):
116- """Called to handle a batch of replication data with a given stream token.
117-
118- By default this just pokes the slave store. Can be overridden in subclasses to
119- handle more.
120-
121- Args:
122- stream_name (str): name of the replication stream for this batch of rows
123- token (int): stream token for this batch of rows
124- rows (list): a list of Stream.ROW_TYPE objects as returned by
125- Stream.parse_row.
126- """
127- logger .debug ("Received rdata %s -> %s" , stream_name , token )
128- self .store .process_replication_rows (stream_name , token , rows )
129-
130- async def on_position (self , stream_name , token ):
131- """Called when we get new position data. By default this just pokes
132- the slave store.
133-
134- Can be overriden in subclasses to handle more.
135- """
136- self .store .process_replication_rows (stream_name , token , [])
137-
138- def on_sync (self , data ):
139- """When we received a SYNC we wake up any deferreds that were waiting
140- for the sync with the given data.
141-
142- Used by tests.
143- """
144- d = self .awaiting_syncs .pop (data , None )
145- if d :
146- d .callback (data )
147-
148- def on_remote_server_up (self , server : str ):
149- """Called when get a new REMOTE_SERVER_UP command."""
150-
151- def get_streams_to_replicate (self ) -> Dict [str , int ]:
152- """Called when a new connection has been established and we need to
153- subscribe to streams.
154-
155- Returns:
156- map from stream name to the most recent update we have for
157- that stream (ie, the point we want to start replicating from)
158- """
159- args = self .store .stream_positions ()
160- user_account_data = args .pop ("user_account_data" , None )
161- room_account_data = args .pop ("room_account_data" , None )
162- if user_account_data :
163- args ["account_data" ] = user_account_data
164- elif room_account_data :
165- args ["account_data" ] = room_account_data
166-
167- return args
168-
169- def get_currently_syncing_users (self ):
170- """Get the list of currently syncing users (if any). This is called
171- when a connection has been established and we need to send the
172- currently syncing users. (Overriden by the synchrotron's only)
173- """
174- return []
175-
176- def send_command (self , cmd ):
177- """Send a command to master (when we get establish a connection if we
178- don't have one already.)
179- """
180- if self .connection :
181- self .connection .send_command (cmd )
182- else :
183- logger .warning ("Queuing command as not connected: %r" , cmd .NAME )
184- self .pending_commands .append (cmd )
185-
186- def send_federation_ack (self , token ):
187- """Ack data for the federation stream. This allows the master to drop
188- data stored purely in memory.
189- """
190- self .send_command (FederationAckCommand (token ))
191-
192- def send_user_sync (self , instance_id , user_id , is_syncing , last_sync_ms ):
193- """Poke the master that a user has started/stopped syncing.
194- """
195- self .send_command (
196- UserSyncCommand (instance_id , user_id , is_syncing , last_sync_ms )
197- )
198-
199- def send_remove_pusher (self , app_id , push_key , user_id ):
200- """Poke the master to remove a pusher for a user
201- """
202- cmd = RemovePusherCommand (app_id , push_key , user_id )
203- self .send_command (cmd )
204-
205- def send_invalidate_cache (self , cache_func , keys ):
206- """Poke the master to invalidate a cache.
207- """
208- cmd = InvalidateCacheCommand (cache_func .__name__ , keys )
209- self .send_command (cmd )
210-
211- def send_user_ip (self , user_id , access_token , ip , user_agent , device_id , last_seen ):
212- """Tell the master that the user made a request.
213- """
214- cmd = UserIpCommand (user_id , access_token , ip , user_agent , device_id , last_seen )
215- self .send_command (cmd )
216-
217- def send_remote_server_up (self , server : str ):
218- self .send_command (RemoteServerUpCommand (server ))
219-
220- def await_sync (self , data ):
221- """Returns a deferred that is resolved when we receive a SYNC command
222- with given data.
223-
224- [Not currently] used by tests.
225- """
226- return self .awaiting_syncs .setdefault (data , defer .Deferred ())
227-
228- def update_connection (self , connection ):
229- """Called when a connection has been established (or lost with None).
230- """
231- self .connection = connection
232- if connection :
233- for cmd in self .pending_commands :
234- connection .send_command (cmd )
235- self .pending_commands = []
236-
237- def finished_connecting (self ):
238- """Called when we have successfully subscribed and caught up to all
239- streams we're interested in.
240- """
241- logger .info ("Finished connecting to server" )
242-
243- # We don't reset the delay any earlier as otherwise if there is a
244- # problem during start up we'll end up tight looping connecting to the
245- # server.
246- if self .factory :
247- self .factory .resetDelay ()
0 commit comments