1717from typing import TYPE_CHECKING , Collection , Optional , Set , Tuple , Type
1818from weakref import WeakValueDictionary
1919
20- from twisted .internet .interfaces import IReactorCore
20+ from twisted .internet .task import LoopingCall
2121
2222from synapse .metrics .background_process_metrics import wrap_as_background_process
2323from synapse .storage ._base import SQLBaseStore
2626 LoggingDatabaseConnection ,
2727 LoggingTransaction ,
2828)
29+ from synapse .types import ISynapseReactor
2930from synapse .util import Clock
3031from synapse .util .stringutils import random_string
3132
@@ -358,7 +359,7 @@ class Lock:
358359
359360 def __init__ (
360361 self ,
361- reactor : IReactorCore ,
362+ reactor : ISynapseReactor ,
362363 clock : Clock ,
363364 store : LockStore ,
364365 read_write : bool ,
@@ -377,19 +378,25 @@ def __init__(
377378
378379 self ._table = "worker_read_write_locks" if read_write else "worker_locks"
379380
380- self ._looping_call = clock .looping_call (
381+ # We might be called from a non-main thread, so we defer setting up the
382+ # looping call.
383+ self ._looping_call : Optional [LoopingCall ] = None
384+ reactor .callFromThread (self ._setup_looping_call )
385+
386+ self ._dropped = False
387+
388+ def _setup_looping_call (self ) -> None :
389+ self ._looping_call = self ._clock .looping_call (
381390 self ._renew ,
382391 _RENEWAL_INTERVAL_MS ,
383- store ,
384- clock ,
385- read_write ,
386- lock_name ,
387- lock_key ,
388- token ,
392+ self . _store ,
393+ self . _clock ,
394+ self . _read_write ,
395+ self . _lock_name ,
396+ self . _lock_key ,
397+ self . _token ,
389398 )
390399
391- self ._dropped = False
392-
393400 @staticmethod
394401 @wrap_as_background_process ("Lock._renew" )
395402 async def _renew (
@@ -459,7 +466,7 @@ async def release(self) -> None:
459466 if self ._dropped :
460467 return
461468
462- if self ._looping_call .running :
469+ if self ._looping_call and self . _looping_call .running :
463470 self ._looping_call .stop ()
464471
465472 await self ._store .db_pool .simple_delete (
@@ -486,8 +493,9 @@ def __del__(self) -> None:
486493 # We should not be dropped without the lock being released (unless
487494 # we're shutting down), but if we are then let's at least stop
488495 # renewing the lock.
489- if self ._looping_call .running :
490- self ._looping_call .stop ()
496+ if self ._looping_call and self ._looping_call .running :
497+ # We might be called from a non-main thread.
498+ self ._reactor .callFromThread (self ._looping_call .stop )
491499
492500 if self ._reactor .running :
493501 logger .error (
0 commit comments