@@ -618,16 +618,49 @@ fn multipart_uploader(
618618 try_stream ! {
619619 let mut stream = pin!( stream) ;
620620 let mut prev_part_number = None ;
621+
622+ let mut optimistic_presigned_url_task: Option <tokio:: task:: JoinHandle <Result <String , AuthedApiError >>> = Some (
623+ tokio:: spawn( {
624+ let app = app. clone( ) ;
625+ let video_id = video_id. clone( ) ;
626+ let upload_id = upload_id. clone( ) ;
627+
628+ async move {
629+ api:: upload_multipart_presign_part( & app, & video_id, & upload_id, 1 )
630+ . await
631+ }
632+ } )
633+ ) ;
634+ let mut expected_part_number = 1u32 ;
635+
621636 while let Some ( item) = stream. next( ) . await {
622637 let Chunk { total_size, part_number, chunk } = item. map_err( |err| format!( "uploader/part/{:?}/fs: {err:?}" , prev_part_number. map( |p| p + 1 ) ) ) ?;
623638 debug!( "Uploading chunk {part_number} ({} bytes) for video {video_id:?}" , chunk. len( ) ) ;
624639 prev_part_number = Some ( part_number) ;
625640 let md5_sum = base64:: encode( md5:: compute( & chunk) . 0 ) ;
626641 let size = chunk. len( ) ;
627642
628- let presigned_url =
629- api:: upload_multipart_presign_part( & app, & video_id, & upload_id, part_number, & md5_sum)
630- . await ?;
643+ let presigned_url = if expected_part_number == part_number {
644+ // The optimistic presigned URL matches, wait for it
645+ if let Some ( task) = optimistic_presigned_url_task. take( ) {
646+ task. await
647+ . map_err( |e| format!( "uploader/part/{part_number}/task_join: {e:?}" ) ) ?
648+ . map_err( |e| format!( "uploader/part/{part_number}/presign: {e}" ) ) ?
649+ } else {
650+ // Fallback if no task available
651+ api:: upload_multipart_presign_part( & app, & video_id, & upload_id, part_number)
652+ . await ?
653+ }
654+ } else {
655+ // The optimistic presigned URL doesn't match, abort it and generate a new correct one
656+ if let Some ( task) = optimistic_presigned_url_task. take( ) {
657+ task. abort( ) ;
658+ }
659+ debug!( "Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!" ) ;
660+ expected_part_number = part_number;
661+ api:: upload_multipart_presign_part( & app, & video_id, & upload_id, part_number)
662+ . await ?
663+ } ;
631664
632665 let url = Uri :: from_str( & presigned_url) . map_err( |err| format!( "uploader/part/{part_number}/invalid_url: {err:?}" ) ) ?;
633666 let resp = retryable_client( url. host( ) . unwrap_or( "<unknown>" ) . to_string( ) )
@@ -655,6 +688,25 @@ fn multipart_uploader(
655688 size,
656689 total_size
657690 } ;
691+
692+ // We generate the presigned URL ahead of time for the next expected part.
693+ // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing.
694+ expected_part_number = part_number + 1 ;
695+ optimistic_presigned_url_task = Some ( tokio:: spawn( {
696+ let app = app. clone( ) ;
697+ let video_id = video_id. clone( ) ;
698+ let upload_id = upload_id. clone( ) ;
699+
700+ async move {
701+ api:: upload_multipart_presign_part( & app, & video_id, & upload_id, expected_part_number)
702+ . await
703+ }
704+ } ) ) ;
705+ }
706+
707+ // Clean up any remaining optimistic task
708+ if let Some ( task) = optimistic_presigned_url_task. take( ) {
709+ task. abort( ) ;
658710 }
659711 }
660712}
0 commit comments