1717import os
1818import shutil
1919from io import BytesIO
20- from typing import IO , TYPE_CHECKING , Dict , List , Optional , Set , Tuple
20+ from typing import IO , TYPE_CHECKING , Any , Dict , List , Optional , Set , Tuple
2121
2222from matrix_common .types .mxc_uri import MXCUri
2323
3232 NotFoundError ,
3333 RequestSendFailed ,
3434 SynapseError ,
35+ cs_error ,
3536)
3637from synapse .config .repository import ThumbnailRequirement
38+ from synapse .http .server import respond_with_json
3739from synapse .http .site import SynapseRequest
3840from synapse .logging .context import defer_to_thread
3941from synapse .media ._base import (
@@ -300,8 +302,62 @@ async def create_content(
300302
301303 return MXCUri (self .server_name , media_id )
302304
305+ def respond_not_yet_uploaded (self , request : SynapseRequest ) -> None :
306+ respond_with_json (
307+ request ,
308+ 404 ,
309+ cs_error ("Media has not been uploaded yet" , code = Codes .NOT_YET_UPLOADED ),
310+ send_cors = True ,
311+ )
312+
313+ async def get_local_media_info (
314+ self , request : SynapseRequest , media_id : str , max_timeout_ms : int
315+ ) -> Optional [Dict [str , Any ]]:
316+ """Gets the info dictionary for given local media ID. If the media has
317+ not been uploaded yet, this function will wait up to ``max_timeout_ms``
318+ milliseconds for the media to be uploaded.
319+ Args:
320+ request: The incoming request.
321+ media_id: The media ID of the content. (This is the same as
322+ the file_id for local content.)
323+ max_timeout_ms: the maximum number of milliseconds to wait for the
324+ media to be uploaded.
325+ Returns:
326+ Either the info dictionary for the given local media ID or
327+ ``None``. If ``None``, then no further processing is necessary as
328+ this function will send the necessary JSON response.
329+ """
330+ wait_until = self .clock .time_msec () + max_timeout_ms
331+ while True :
332+ # Get the info for the media
333+ media_info = await self .store .get_local_media (media_id )
334+ if not media_info :
335+ respond_404 (request )
336+ return None
337+
338+ if media_info ["quarantined_by" ]:
339+ logger .info ("Media is quarantined" )
340+ respond_404 (request )
341+ return None
342+
343+ # The file has been uploaded, so stop looping
344+ if media_info .get ("media_length" ) is not None :
345+ return media_info
346+
347+ if self .clock .time_msec () >= wait_until :
348+ break
349+
350+ await self .clock .sleep (0.5 )
351+
352+ self .respond_not_yet_uploaded (request )
353+ return None
354+
303355 async def get_local_media (
304- self , request : SynapseRequest , media_id : str , name : Optional [str ]
356+ self ,
357+ request : SynapseRequest ,
358+ media_id : str ,
359+ name : Optional [str ],
360+ max_timeout_ms : int ,
305361 ) -> None :
306362 """Responds to requests for local media, if exists, or returns 404.
307363
@@ -311,13 +367,14 @@ async def get_local_media(
311367 the file_id for local content.)
312368 name: Optional name that, if specified, will be used as
313369 the filename in the Content-Disposition header of the response.
370+ max_timeout_ms: the maximum number of milliseconds to wait for the
371+ media to be uploaded.
314372
315373 Returns:
316374 Resolves once a response has successfully been written to request
317375 """
318- media_info = await self .store .get_local_media (media_id )
319- if not media_info or media_info ["quarantined_by" ]:
320- respond_404 (request )
376+ media_info = await self .get_local_media_info (request , media_id , max_timeout_ms )
377+ if not media_info :
321378 return
322379
323380 self .mark_recently_accessed (None , media_id )
@@ -342,6 +399,7 @@ async def get_remote_media(
342399 server_name : str ,
343400 media_id : str ,
344401 name : Optional [str ],
402+ max_timeout_ms : int ,
345403 ) -> None :
346404 """Respond to requests for remote media.
347405
@@ -351,6 +409,8 @@ async def get_remote_media(
351409 media_id: The media ID of the content (as defined by the remote server).
352410 name: Optional name that, if specified, will be used as
353411 the filename in the Content-Disposition header of the response.
412+ max_timeout_ms: the maximum number of milliseconds to wait for the
413+ media to be uploaded.
354414
355415 Returns:
356416 Resolves once a response has successfully been written to request
@@ -368,19 +428,19 @@ async def get_remote_media(
368428 key = (server_name , media_id )
369429 async with self .remote_media_linearizer .queue (key ):
370430 responder , media_info = await self ._get_remote_media_impl (
371- server_name , media_id
431+ server_name , media_id , max_timeout_ms
372432 )
373433
374434 # We deliberately stream the file outside the lock
375- if responder :
435+ if responder and media_info :
376436 media_type = media_info ["media_type" ]
377437 media_length = media_info ["media_length" ]
378438 upload_name = name if name else media_info ["upload_name" ]
379439 await respond_with_responder (
380440 request , responder , media_type , media_length , upload_name
381441 )
382442 else :
383- respond_404 (request )
443+ self . respond_not_yet_uploaded (request )
384444
385445 async def get_remote_media_info (self , server_name : str , media_id : str ) -> dict :
386446 """Gets the media info associated with the remote file, downloading
@@ -404,7 +464,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
404464 key = (server_name , media_id )
405465 async with self .remote_media_linearizer .queue (key ):
406466 responder , media_info = await self ._get_remote_media_impl (
407- server_name , media_id
467+ server_name , media_id , max_timeout_ms
408468 )
409469
410470 # Ensure we actually use the responder so that it releases resources
@@ -415,7 +475,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
415475 return media_info
416476
417477 async def _get_remote_media_impl (
418- self , server_name : str , media_id : str
478+ self , server_name : str , media_id : str , max_timeout_ms : int
419479 ) -> Tuple [Optional [Responder ], dict ]:
420480 """Looks for media in local cache, if not there then attempt to
421481 download from remote server.
@@ -424,6 +484,8 @@ async def _get_remote_media_impl(
424484 server_name: Remote server_name where the media originated.
425485 media_id: The media ID of the content (as defined by the
426486 remote server).
487+ max_timeout_ms: the maximum number of milliseconds to wait for the
488+ media to be uploaded.
427489
428490 Returns:
429491 A tuple of responder and the media info of the file.
@@ -454,8 +516,7 @@ async def _get_remote_media_impl(
454516
455517 try :
456518 media_info = await self ._download_remote_file (
457- server_name ,
458- media_id ,
519+ server_name , media_id , max_timeout_ms
459520 )
460521 except SynapseError :
461522 raise
@@ -488,6 +549,7 @@ async def _download_remote_file(
488549 self ,
489550 server_name : str ,
490551 media_id : str ,
552+ max_timeout_ms : int ,
491553 ) -> dict :
492554 """Attempt to download the remote file from the given server name,
493555 using the given file_id as the local id.
@@ -497,7 +559,8 @@ async def _download_remote_file(
497559 media_id: The media ID of the content (as defined by the
498560 remote server). This is different than the file_id, which is
499561 locally generated.
500- file_id: Local file ID
562+ max_timeout_ms: the maximum number of milliseconds to wait for the
563+ media to be uploaded.
501564
502565 Returns:
503566 The media info of the file.
@@ -521,7 +584,8 @@ async def _download_remote_file(
521584 # tell the remote server to 404 if it doesn't
522585 # recognise the server_name, to make sure we don't
523586 # end up with a routing loop.
524- "allow_remote" : "false"
587+ "allow_remote" : "false" ,
588+ "timeout_ms" : str (max_timeout_ms ),
525589 },
526590 )
527591 except RequestSendFailed as e :
0 commit comments