3434from synapse .logging .opentracing import log_kv , trace
3535from synapse .storage .databases .main .e2e_room_keys import RoomKey
3636from synapse .types import JsonDict
37- from synapse .util .async_helpers import Linearizer
37+ from synapse .util .async_helpers import ReadWriteLock
3838
3939if TYPE_CHECKING :
4040 from synapse .server import HomeServer
@@ -58,7 +58,7 @@ def __init__(self, hs: "HomeServer"):
5858 # clients belonging to a user will receive and try to upload a new session at
5959 # roughly the same time. Also used to lock out uploads when the key is being
6060 # changed.
61- self ._upload_linearizer = Linearizer ( "upload_room_keys_lock" )
61+ self ._upload_lock = ReadWriteLock ( )
6262
6363 @trace
6464 async def get_room_keys (
@@ -89,7 +89,7 @@ async def get_room_keys(
8989
9090 # we deliberately take the lock to get keys so that changing the version
9191 # works atomically
92- async with self ._upload_linearizer . queue (user_id ):
92+ async with self ._upload_lock . read (user_id ):
9393 # make sure the backup version exists
9494 try :
9595 await self .store .get_e2e_room_keys_version_info (user_id , version )
@@ -132,7 +132,7 @@ async def delete_room_keys(
132132 """
133133
134134 # lock for consistency with uploading
135- async with self ._upload_linearizer . queue (user_id ):
135+ async with self ._upload_lock . write (user_id ):
136136 # make sure the backup version exists
137137 try :
138138 version_info = await self .store .get_e2e_room_keys_version_info (
@@ -193,7 +193,7 @@ async def upload_room_keys(
193193 # TODO: Validate the JSON to make sure it has the right keys.
194194
195195 # XXX: perhaps we should use a finer grained lock here?
196- async with self ._upload_linearizer . queue (user_id ):
196+ async with self ._upload_lock . write (user_id ):
197197 # Check that the version we're trying to upload is the current version
198198 try :
199199 version_info = await self .store .get_e2e_room_keys_version_info (user_id )
@@ -355,7 +355,7 @@ async def create_version(self, user_id: str, version_info: JsonDict) -> str:
355355 # TODO: Validate the JSON to make sure it has the right keys.
356356
357357 # lock everyone out until we've switched version
358- async with self ._upload_linearizer . queue (user_id ):
358+ async with self ._upload_lock . write (user_id ):
359359 new_version = await self .store .create_e2e_room_keys_version (
360360 user_id , version_info
361361 )
@@ -382,7 +382,7 @@ async def get_version_info(
382382 }
383383 """
384384
385- async with self ._upload_linearizer . queue (user_id ):
385+ async with self ._upload_lock . read (user_id ):
386386 try :
387387 res = await self .store .get_e2e_room_keys_version_info (user_id , version )
388388 except StoreError as e :
@@ -407,7 +407,7 @@ async def delete_version(self, user_id: str, version: Optional[str] = None) -> N
407407 NotFoundError: if this backup version doesn't exist
408408 """
409409
410- async with self ._upload_linearizer . queue (user_id ):
410+ async with self ._upload_lock . write (user_id ):
411411 try :
412412 await self .store .delete_e2e_room_keys_version (user_id , version )
413413 except StoreError as e :
@@ -437,7 +437,7 @@ async def update_version(
437437 raise SynapseError (
438438 400 , "Version in body does not match" , Codes .INVALID_PARAM
439439 )
440- async with self ._upload_linearizer . queue (user_id ):
440+ async with self ._upload_lock . write (user_id ):
441441 try :
442442 old_info = await self .store .get_e2e_room_keys_version_info (
443443 user_id , version
0 commit comments