@@ -117,7 +117,6 @@ def __init__(self, hs):
117
117
self ._server_notices_sender = None
118
118
if self ._is_master :
119
119
self ._server_notices_sender = hs .get_server_notices_sender ()
120
- self ._notifier .add_remote_server_up_callback (self .send_remote_server_up )
121
120
122
121
def start_replication (self , hs ):
123
122
"""Helper method to start a replication connection to the remote server
@@ -163,7 +162,7 @@ def start_replication(self, hs):
163
162
port = hs .config .worker_replication_port
164
163
hs .get_reactor ().connectTCP (host , port , self ._factory )
165
164
166
- async def on_REPLICATE (self , cmd : ReplicateCommand ):
165
+ async def on_REPLICATE (self , conn : AbstractConnection , cmd : ReplicateCommand ):
167
166
# We only want to announce positions by the writer of the streams.
168
167
# Currently this is just the master process.
169
168
if not self ._is_master :
@@ -173,25 +172,31 @@ async def on_REPLICATE(self, cmd: ReplicateCommand):
173
172
current_token = stream .current_token ()
174
173
self .send_command (PositionCommand (stream_name , current_token ))
175
174
176
- async def on_USER_SYNC (self , cmd : UserSyncCommand ):
175
+ async def on_USER_SYNC (self , conn : AbstractConnection , cmd : UserSyncCommand ):
177
176
user_sync_counter .inc ()
178
177
179
178
if self ._is_master :
180
179
await self ._presence_handler .update_external_syncs_row (
181
180
cmd .instance_id , cmd .user_id , cmd .is_syncing , cmd .last_sync_ms
182
181
)
183
182
184
- async def on_CLEAR_USER_SYNC (self , cmd : ClearUserSyncsCommand ):
183
+ async def on_CLEAR_USER_SYNC (
184
+ self , conn : AbstractConnection , cmd : ClearUserSyncsCommand
185
+ ):
185
186
if self ._is_master :
186
187
await self ._presence_handler .update_external_syncs_clear (cmd .instance_id )
187
188
188
- async def on_FEDERATION_ACK (self , cmd : FederationAckCommand ):
189
+ async def on_FEDERATION_ACK (
190
+ self , conn : AbstractConnection , cmd : FederationAckCommand
191
+ ):
189
192
federation_ack_counter .inc ()
190
193
191
194
if self ._federation_sender :
192
195
self ._federation_sender .federation_ack (cmd .token )
193
196
194
- async def on_REMOVE_PUSHER (self , cmd : RemovePusherCommand ):
197
+ async def on_REMOVE_PUSHER (
198
+ self , conn : AbstractConnection , cmd : RemovePusherCommand
199
+ ):
195
200
remove_pusher_counter .inc ()
196
201
197
202
if self ._is_master :
@@ -201,7 +206,9 @@ async def on_REMOVE_PUSHER(self, cmd: RemovePusherCommand):
201
206
202
207
self ._notifier .on_new_replication_data ()
203
208
204
- async def on_INVALIDATE_CACHE (self , cmd : InvalidateCacheCommand ):
209
+ async def on_INVALIDATE_CACHE (
210
+ self , conn : AbstractConnection , cmd : InvalidateCacheCommand
211
+ ):
205
212
invalidate_cache_counter .inc ()
206
213
207
214
if self ._is_master :
@@ -211,7 +218,7 @@ async def on_INVALIDATE_CACHE(self, cmd: InvalidateCacheCommand):
211
218
cmd .cache_func , tuple (cmd .keys )
212
219
)
213
220
214
- async def on_USER_IP (self , cmd : UserIpCommand ):
221
+ async def on_USER_IP (self , conn : AbstractConnection , cmd : UserIpCommand ):
215
222
user_ip_cache_counter .inc ()
216
223
217
224
if self ._is_master :
@@ -227,7 +234,7 @@ async def on_USER_IP(self, cmd: UserIpCommand):
227
234
if self ._server_notices_sender :
228
235
await self ._server_notices_sender .on_user_ip (cmd .user_id )
229
236
230
- async def on_RDATA (self , cmd : RdataCommand ):
237
+ async def on_RDATA (self , conn : AbstractConnection , cmd : RdataCommand ):
231
238
stream_name = cmd .stream_name
232
239
inbound_rdata_count .labels (stream_name ).inc ()
233
240
@@ -278,7 +285,7 @@ async def on_rdata(self, stream_name: str, token: int, rows: list):
278
285
logger .debug ("Received rdata %s -> %s" , stream_name , token )
279
286
await self ._replication_data_handler .on_rdata (stream_name , token , rows )
280
287
281
- async def on_POSITION (self , cmd : PositionCommand ):
288
+ async def on_POSITION (self , conn : AbstractConnection , cmd : PositionCommand ):
282
289
stream = self ._streams .get (cmd .stream_name )
283
290
if not stream :
284
291
logger .error ("Got POSITION for unknown stream: %s" , cmd .stream_name )
@@ -332,12 +339,30 @@ async def on_POSITION(self, cmd: PositionCommand):
332
339
333
340
self ._streams_connected .add (cmd .stream_name )
334
341
335
- async def on_REMOTE_SERVER_UP (self , cmd : RemoteServerUpCommand ):
342
+ async def on_REMOTE_SERVER_UP (
343
+ self , conn : AbstractConnection , cmd : RemoteServerUpCommand
344
+ ):
336
345
""""Called when get a new REMOTE_SERVER_UP command."""
337
346
self ._replication_data_handler .on_remote_server_up (cmd .data )
338
347
339
- if self ._is_master :
340
- self ._notifier .notify_remote_server_up (cmd .data )
348
+ self ._notifier .notify_remote_server_up (cmd .data )
349
+
350
+ # We relay to all other connections to ensure every instance gets the
351
+ # notification.
352
+ #
353
+ # When configured to use redis we'll always only have one connection and
354
+ # so this is a no-op (all instances will have already received the same
355
+ # REMOTE_SERVER_UP command).
356
+ #
357
+ # For direct TCP connections this will relay to all other connections
358
+ # connected to us. When on master this will correctly fan out to all
359
+ # other direct TCP clients and on workers there'll only be the one
360
+ # connection to master.
361
+ #
362
+ # (The logic here should also be sound if we have a mix of Redis and
363
+ # direct TCP connections so long as there is only one traffic route
364
+ # between two instances, but that is not currently supported).
365
+ self .send_command (cmd , ignore_conn = conn )
341
366
342
367
def new_connection (self , connection : AbstractConnection ):
343
368
"""Called when we have a new connection.
@@ -382,11 +407,21 @@ def connected(self) -> bool:
382
407
"""
383
408
return bool (self ._connections )
384
409
385
- def send_command (self , cmd : Command ):
410
+ def send_command (
411
+ self , cmd : Command , ignore_conn : Optional [AbstractConnection ] = None
412
+ ):
386
413
"""Send a command to all connected connections.
414
+
415
+ Args:
416
+ cmd
417
+ ignore_conn: If set don't send command to the given connection.
418
+ Used when relaying commands from one connection to all others.
387
419
"""
388
420
if self ._connections :
389
421
for connection in self ._connections :
422
+ if connection == ignore_conn :
423
+ continue
424
+
390
425
try :
391
426
connection .send_command (cmd )
392
427
except Exception :
0 commit comments