@@ -8,7 +8,7 @@ use aws_sdk_s3::error::SdkError;
88use aws_sdk_s3:: operation:: get_object:: builders:: GetObjectFluentBuilder ;
99use aws_sdk_s3:: operation:: list_objects:: builders:: ListObjectsFluentBuilder ;
1010use aws_sdk_s3:: operation:: list_objects:: ListObjectsOutput ;
11- use aws_sdk_s3:: primitives:: { ByteStream , SdkBody } ;
11+ use aws_sdk_s3:: primitives:: { ByteStream } ;
1212use aws_sdk_s3:: types:: { CompletedMultipartUpload , CompletedPart } ;
1313use aws_sdk_s3:: { Client , Config } ;
1414use bytes:: { Buf , Bytes , BytesMut } ;
@@ -608,92 +608,166 @@ impl Replicator {
608608 return Ok ( ( ) ) ;
609609 }
610610 tracing:: debug!( "Snapshotting {}" , self . db_path) ;
611- let change_counter = match self . use_compression {
612- CompressionKind :: None => {
613- self . client
614- . put_object ( )
615- . bucket ( & self . bucket )
616- . key ( format ! ( "{}-{}/db.db" , self . db_name, self . generation) )
617- . body ( ByteStream :: from_path ( & self . db_path ) . await ?)
618- . send ( )
619- . await ?;
620- let mut reader = tokio:: fs:: File :: open ( & self . db_path ) . await ?;
621- Self :: read_change_counter ( & mut reader) . await ?
622- }
623- CompressionKind :: Gzip => {
624- let mut reader = tokio:: fs:: File :: open ( & self . db_path ) . await ?;
625-
626- let stream = tokio:: io:: BufReader :: new ( reader. try_clone ( ) . await ?) ;
627- let mut gzip_reader = async_compression:: tokio:: bufread:: GzipEncoder :: new ( stream) ;
628-
629- let key = format ! ( "{}-{}/db.gz" , self . db_name, self . generation) ;
630- let upload_id = self
631- . client
632- . create_multipart_upload ( )
633- . bucket ( & self . bucket )
634- . key ( key. clone ( ) )
635- . send ( )
636- . await ?
637- . upload_id
638- . ok_or_else ( || anyhow:: anyhow!( "missing upload_id" ) ) ?;
639-
640- const CHUNK_SIZE : usize = 5 * 1024 * 1024 ;
641- let mut parts = Vec :: new ( ) ;
642- // S3 takes an 1-based index
643- for part in 1 ..=10000 {
644- let mut buffer = bytes:: BytesMut :: with_capacity ( CHUNK_SIZE ) ;
645- loop {
646- let bytes_written = gzip_reader. read_buf ( & mut buffer) . await ?;
647- // EOF or buffer is full
648- if bytes_written == 0 {
611+ let change_counter =
612+ match self . use_compression {
613+ CompressionKind :: None => {
614+ self . client
615+ . put_object ( )
616+ . bucket ( & self . bucket )
617+ . key ( format ! ( "{}-{}/db.db" , self . db_name, self . generation) )
618+ . body ( ByteStream :: from_path ( & self . db_path ) . await ?)
619+ . send ( )
620+ . await ?;
621+ let mut reader = tokio:: fs:: File :: open ( & self . db_path ) . await ?;
622+ Self :: read_change_counter ( & mut reader) . await ?
623+ }
624+ CompressionKind :: Gzip => {
625+ let mut reader = tokio:: fs:: File :: open ( & self . db_path ) . await ?;
626+ let buf_reader = tokio:: io:: BufReader :: new ( reader. try_clone ( ) . await ?) ;
627+ let mut gzip_reader =
628+ async_compression:: tokio:: bufread:: GzipEncoder :: new ( buf_reader) ;
629+
630+ let key = format ! ( "{}-{}/db.gz" , self . db_name, self . generation) ;
631+
632+ // Unfortunally we can send the gzip output in a single call without buffering
633+ // the whole snapshot in memory because S3 requires the `Content-Length` header
634+ // to be set.
635+ let upload_id = self
636+ . client
637+ . create_multipart_upload ( )
638+ . bucket ( & self . bucket )
639+ . key ( key. clone ( ) )
640+ . send ( )
641+ . await ?
642+ . upload_id
643+ . ok_or_else ( || anyhow:: anyhow!( "missing upload_id" ) ) ?;
644+
645+ let chunk_sizes = & [
646+ 5 * 1024 * 1024 ,
647+ 10 * 1024 * 1024 ,
648+ 25 * 1024 * 1024 ,
649+ 50 * 1024 * 1024 ,
650+ 100 * 1024 * 1024 ,
651+ ] ;
652+
653+ const LAST_PART : i32 = 10_000 ;
654+ let mut parts = Vec :: new ( ) ;
655+ let mut has_reached_eof = false ;
656+
657+ // S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to
658+ // 5 GiB, except for the last one that has no limits.
659+ //
660+ // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
661+ for part in 0 ..LAST_PART - 1 {
662+ // Progressively increase the chunk size every 16 chunks up to the last
663+ // chunk_size. This allows smaller allocate for small databases.
664+ //
665+ // Here's a table of how much data we can chunk:
666+ // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐
667+ // │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │
668+ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
669+ // │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │
670+ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
671+ // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │
672+ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
673+ // │ 25 MiB │ 16 │ 400 MiB │ 560 MiB │
674+ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
675+ // │ 50 MiB │ 16 │ 800 MiB │ 1.172 GiB │
676+ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
677+ // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.387 GiB │
678+ // └────────────┴──────────────────┴───────────────────────┴──────────────────┘
679+ //
680+ // We can send up to 971 GiB in chunks, which is more than enough for the
681+ // majority of use cases.
682+ //
683+ // The last chunk is reserved for the remaining of the `gzip_reader`
684+ let chunk_size =
685+ chunk_sizes[ ( ( part / 16 ) as usize ) . min ( chunk_sizes. len ( ) - 1 ) ] ;
686+
687+ let mut buffer = bytes:: BytesMut :: with_capacity ( chunk_size) ;
688+ loop {
689+ let bytes_written = gzip_reader. read_buf ( & mut buffer) . await ?;
690+ // EOF or buffer is full
691+ if bytes_written == 0 {
692+ break ;
693+ }
694+ }
695+
696+ // EOF
697+ if buffer. is_empty ( ) {
698+ has_reached_eof = true ;
649699 break ;
650700 }
701+
702+ let part_out = self
703+ . client
704+ . upload_part ( )
705+ . bucket ( & self . bucket )
706+ . key ( key. clone ( ) )
707+ . upload_id ( upload_id. clone ( ) )
708+ . body ( ByteStream :: from ( buffer. freeze ( ) ) )
709+ . part_number ( part + 1 )
710+ . send ( )
711+ . await ?;
712+
713+ parts. push (
714+ CompletedPart :: builder ( )
715+ . part_number ( part + 1 )
716+ . e_tag ( part_out. e_tag . ok_or_else ( || {
717+ anyhow:: anyhow!( "e_tag missing from part upload" )
718+ } ) ?)
719+ . build ( ) ,
720+ ) ;
651721 }
652722
653- // EOF
654- if buffer. is_empty ( ) {
655- break ;
723+ // If the gzip stream has not reached EOF we need to send the last part to S3.
724+ // Since we don't know the size of the stream and we can't be sure if it fits in
725+ // memory, we save it into a file to allow streaming.
726+ //
727+ // This would only happen to databases that are around ~1 TiB.
728+ if !has_reached_eof {
729+ let last_chunk_path =
730+ format ! ( "{}-{}/db.last-chunk.gz" , self . db_name, self . generation) ;
731+ let mut last_chunk_file = tokio:: fs:: File :: create ( & last_chunk_path) . await ?;
732+ tokio:: io:: copy ( & mut gzip_reader, & mut last_chunk_file) . await ?;
733+
734+ let part_out = self
735+ . client
736+ . upload_part ( )
737+ . bucket ( & self . bucket )
738+ . key ( key. clone ( ) )
739+ . upload_id ( upload_id. clone ( ) )
740+ . body ( ByteStream :: from_path ( last_chunk_path) . await ?)
741+ . part_number ( LAST_PART ) // last chunk
742+ . send ( )
743+ . await ?;
744+
745+ parts. push (
746+ CompletedPart :: builder ( )
747+ . part_number ( LAST_PART )
748+ . e_tag ( part_out. e_tag . ok_or_else ( || {
749+ anyhow:: anyhow!( "e_tag missing from part upload" )
750+ } ) ?)
751+ . build ( ) ,
752+ ) ;
656753 }
657754
658- let part_out = self
659- . client
660- . upload_part ( )
755+ self . client
756+ . complete_multipart_upload ( )
757+ . upload_id ( upload_id )
661758 . bucket ( & self . bucket )
662- . key ( key. clone ( ) )
663- . upload_id ( upload_id. clone ( ) )
664- . body ( ByteStream :: from ( buffer. freeze ( ) ) )
665- . part_number ( part)
759+ . key ( key)
760+ . multipart_upload (
761+ CompletedMultipartUpload :: builder ( )
762+ . set_parts ( Some ( parts) )
763+ . build ( ) ,
764+ )
666765 . send ( )
667766 . await ?;
668767
669- parts. push (
670- CompletedPart :: builder ( )
671- . part_number ( part)
672- . e_tag (
673- part_out. e_tag . ok_or_else ( || {
674- anyhow:: anyhow!( "e_tag missing from part upload" )
675- } ) ?,
676- )
677- . build ( ) ,
678- ) ;
768+ Self :: read_change_counter ( & mut reader) . await ?
679769 }
680-
681- self . client
682- . complete_multipart_upload ( )
683- . upload_id ( upload_id)
684- . bucket ( & self . bucket )
685- . key ( key)
686- . multipart_upload (
687- CompletedMultipartUpload :: builder ( )
688- . set_parts ( Some ( parts) )
689- . build ( ) ,
690- )
691- . send ( )
692- . await ?;
693-
694- Self :: read_change_counter ( & mut reader) . await ?
695- }
696- } ;
770+ } ;
697771
698772 /* FIXME: we can't rely on the change counter in WAL mode:
699773 ** "In WAL mode, changes to the database are detected using the wal-index and
0 commit comments