@@ -94,7 +94,7 @@ def start(self) -> None:
94
94
return
95
95
run_as_background_process ("start_pushers" , self ._start_pushers )
96
96
97
- async def add_pusher (
97
+ async def add_or_update_pusher (
98
98
self ,
99
99
user_id : str ,
100
100
access_token : Optional [int ],
@@ -106,6 +106,7 @@ async def add_pusher(
106
106
lang : Optional [str ],
107
107
data : JsonDict ,
108
108
profile_tag : str = "" ,
109
+ enabled : bool = True ,
109
110
) -> Optional [Pusher ]:
110
111
"""Creates a new pusher and adds it to the pool
111
112
@@ -147,9 +148,20 @@ async def add_pusher(
147
148
last_stream_ordering = last_stream_ordering ,
148
149
last_success = None ,
149
150
failing_since = None ,
151
+ enabled = enabled ,
150
152
)
151
153
)
152
154
155
+ # Before we actually persist the pusher, we check if the user already has one
156
+ # for this app ID and pushkey. If so, we want to keep the access token in place,
157
+ # since this could be one device modifying (e.g. enabling/disabling) another
158
+ # device's pusher.
159
+ existing_config = await self ._get_pusher_config_for_user_by_app_id_and_pushkey (
160
+ user_id , app_id , pushkey
161
+ )
162
+ if existing_config :
163
+ access_token = existing_config .access_token
164
+
153
165
await self .store .add_pusher (
154
166
user_id = user_id ,
155
167
access_token = access_token ,
@@ -163,8 +175,9 @@ async def add_pusher(
163
175
data = data ,
164
176
last_stream_ordering = last_stream_ordering ,
165
177
profile_tag = profile_tag ,
178
+ enabled = enabled ,
166
179
)
167
- pusher = await self .start_pusher_by_id (app_id , pushkey , user_id )
180
+ pusher = await self .process_pusher_change_by_id (app_id , pushkey , user_id )
168
181
169
182
return pusher
170
183
@@ -276,10 +289,25 @@ async def on_new_receipts(
276
289
except Exception :
277
290
logger .exception ("Exception in pusher on_new_receipts" )
278
291
279
- async def start_pusher_by_id (
292
+ async def _get_pusher_config_for_user_by_app_id_and_pushkey (
293
+ self , user_id : str , app_id : str , pushkey : str
294
+ ) -> Optional [PusherConfig ]:
295
+ resultlist = await self .store .get_pushers_by_app_id_and_pushkey (app_id , pushkey )
296
+
297
+ pusher_config = None
298
+ for r in resultlist :
299
+ if r .user_name == user_id :
300
+ pusher_config = r
301
+
302
+ return pusher_config
303
+
304
+ async def process_pusher_change_by_id (
280
305
self , app_id : str , pushkey : str , user_id : str
281
306
) -> Optional [Pusher ]:
282
- """Look up the details for the given pusher, and start it
307
+ """Look up the details for the given pusher, and either start it if its
308
+ "enabled" flag is True, or try to stop it otherwise.
309
+
310
+ If the pusher is new and its "enabled" flag is False, the stop is a noop.
283
311
284
312
Returns:
285
313
The pusher started, if any
@@ -290,12 +318,13 @@ async def start_pusher_by_id(
290
318
if not self ._pusher_shard_config .should_handle (self ._instance_name , user_id ):
291
319
return None
292
320
293
- resultlist = await self .store .get_pushers_by_app_id_and_pushkey (app_id , pushkey )
321
+ pusher_config = await self ._get_pusher_config_for_user_by_app_id_and_pushkey (
322
+ user_id , app_id , pushkey
323
+ )
294
324
295
- pusher_config = None
296
- for r in resultlist :
297
- if r .user_name == user_id :
298
- pusher_config = r
325
+ if pusher_config and not pusher_config .enabled :
326
+ self .maybe_stop_pusher (app_id , pushkey , user_id )
327
+ return None
299
328
300
329
pusher = None
301
330
if pusher_config :
@@ -305,7 +334,7 @@ async def start_pusher_by_id(
305
334
306
335
async def _start_pushers (self ) -> None :
307
336
"""Start all the pushers"""
308
- pushers = await self .store .get_all_pushers ()
337
+ pushers = await self .store .get_enabled_pushers ()
309
338
310
339
# Stagger starting up the pushers so we don't completely drown the
311
340
# process on start up.
@@ -363,6 +392,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
363
392
364
393
synapse_pushers .labels (type (pusher ).__name__ , pusher .app_id ).inc ()
365
394
395
+ logger .info ("Starting pusher %s / %s" , pusher .user_id , appid_pushkey )
396
+
366
397
# Check if there *may* be push to process. We do this as this check is a
367
398
# lot cheaper to do than actually fetching the exact rows we need to
368
399
# push.
@@ -382,16 +413,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
382
413
return pusher
383
414
384
415
async def remove_pusher (self , app_id : str , pushkey : str , user_id : str ) -> None :
385
- appid_pushkey = "%s:%s" % (app_id , pushkey )
386
-
387
- byuser = self .pushers .get (user_id , {})
388
-
389
- if appid_pushkey in byuser :
390
- logger .info ("Stopping pusher %s / %s" , user_id , appid_pushkey )
391
- pusher = byuser .pop (appid_pushkey )
392
- pusher .on_stop ()
393
-
394
- synapse_pushers .labels (type (pusher ).__name__ , pusher .app_id ).dec ()
416
+ self .maybe_stop_pusher (app_id , pushkey , user_id )
395
417
396
418
# We can only delete pushers on master.
397
419
if self ._remove_pusher_client :
@@ -402,3 +424,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
402
424
await self .store .delete_pusher_by_app_id_pushkey_user_id (
403
425
app_id , pushkey , user_id
404
426
)
427
+
428
+ def maybe_stop_pusher (self , app_id : str , pushkey : str , user_id : str ) -> None :
429
+ """Stops a pusher with the given app ID and push key if one is running.
430
+
431
+ Args:
432
+ app_id: the pusher's app ID.
433
+ pushkey: the pusher's push key.
434
+ user_id: the user the pusher belongs to. Only used for logging.
435
+ """
436
+ appid_pushkey = "%s:%s" % (app_id , pushkey )
437
+
438
+ byuser = self .pushers .get (user_id , {})
439
+
440
+ if appid_pushkey in byuser :
441
+ logger .info ("Stopping pusher %s / %s" , user_id , appid_pushkey )
442
+ pusher = byuser .pop (appid_pushkey )
443
+ pusher .on_stop ()
444
+
445
+ synapse_pushers .labels (type (pusher ).__name__ , pusher .app_id ).dec ()
0 commit comments