@@ -18,7 +18,7 @@ use bytes::{Buf, Bytes};
18
18
use chrono:: { NaiveDateTime , TimeZone , Utc } ;
19
19
use std:: io:: SeekFrom ;
20
20
use std:: ops:: Deref ;
21
- use std:: path:: Path ;
21
+ use std:: path:: { Path , PathBuf } ;
22
22
use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
23
23
use std:: sync:: Arc ;
24
24
use tokio:: fs:: { File , OpenOptions } ;
@@ -642,28 +642,40 @@ impl Replicator {
642
642
// Returns the compressed database file path and its change counter, extracted
643
643
// from the header of page1 at offset 24..27 (as per SQLite documentation).
644
644
pub async fn maybe_compress_main_db_file (
645
- mut reader : File ,
645
+ db_path : & Path ,
646
646
compression : CompressionKind ,
647
647
) -> Result < ByteStream > {
648
- reader. seek ( SeekFrom :: Start ( 0 ) ) . await ?;
649
648
match compression {
650
- CompressionKind :: None => Ok ( ByteStream :: read_from ( ) . file ( reader ) . build ( ) . await ?) ,
649
+ CompressionKind :: None => Ok ( ByteStream :: from_path ( db_path ) . await ?) ,
651
650
CompressionKind :: Gzip => {
651
+ let mut reader = File :: open ( db_path) . await ?;
652
+ let gzip_path = Self :: db_gzip_path ( db_path) ;
652
653
let compressed_file = OpenOptions :: new ( )
653
654
. create ( true )
654
655
. write ( true )
655
656
. read ( true )
656
657
. truncate ( true )
657
- . open ( "db.gz" )
658
+ . open ( & gzip_path )
658
659
. await ?;
659
660
let mut writer = GzipEncoder :: new ( compressed_file) ;
660
661
let size = tokio:: io:: copy ( & mut reader, & mut writer) . await ?;
661
- tracing:: trace!( "Compressed database file ({} bytes) into db.gz" , size) ;
662
662
writer. shutdown ( ) . await ?;
663
- Ok ( ByteStream :: from_path ( "db.gz" ) . await ?)
663
+ tracing:: debug!(
664
+ "Compressed database file ({} bytes) into `{}`" ,
665
+ size,
666
+ gzip_path. display( )
667
+ ) ;
668
+ Ok ( ByteStream :: from_path ( gzip_path) . await ?)
664
669
}
665
670
}
666
671
}
672
+
673
+ fn db_gzip_path ( db_path : & Path ) -> PathBuf {
674
+ let mut gzip_path = db_path. to_path_buf ( ) ;
675
+ gzip_path. pop ( ) ;
676
+ gzip_path. join ( "db.gz" )
677
+ }
678
+
667
679
// Replicates local WAL pages to S3, if local WAL is present.
668
680
// This function is called under the assumption that if local WAL
669
681
// file is present, it was already detected to be newer than its
@@ -729,8 +741,10 @@ impl Replicator {
729
741
let generation = self . generation ( ) ?;
730
742
let start_ts = Instant :: now ( ) ;
731
743
let client = self . client . clone ( ) ;
732
- let mut db_file = File :: open ( & self . db_path ) . await ?;
733
- let change_counter = Self :: read_change_counter ( & mut db_file) . await ?;
744
+ let change_counter = {
745
+ let mut db_file = File :: open ( & self . db_path ) . await ?;
746
+ Self :: read_change_counter ( & mut db_file) . await ?
747
+ } ;
734
748
let snapshot_req = client. put_object ( ) . bucket ( self . bucket . clone ( ) ) . key ( format ! (
735
749
"{}-{}/db.{}" ,
736
750
self . db_name, generation, self . use_compression
@@ -753,14 +767,15 @@ impl Replicator {
753
767
) ) ) ;
754
768
let snapshot_notifier = self . snapshot_notifier . clone ( ) ;
755
769
let compression = self . use_compression ;
770
+ let db_path = PathBuf :: from ( self . db_path . clone ( ) ) ;
756
771
let handle = tokio:: spawn ( async move {
757
772
tracing:: trace!( "Start snapshotting generation {}" , generation) ;
758
773
let start = Instant :: now ( ) ;
759
- let body = match Self :: maybe_compress_main_db_file ( db_file , compression) . await {
774
+ let body = match Self :: maybe_compress_main_db_file ( & db_path , compression) . await {
760
775
Ok ( file) => file,
761
776
Err ( e) => {
762
777
tracing:: error!(
763
- "Failed to compress db file (generation {}): {}" ,
778
+ "Failed to compress db file (generation {}): {:? }" ,
764
779
generation,
765
780
e
766
781
) ;
@@ -791,7 +806,9 @@ impl Replicator {
791
806
let _ = snapshot_notifier. send ( Ok ( Some ( generation) ) ) ;
792
807
let elapsed = Instant :: now ( ) - start;
793
808
tracing:: debug!( "Snapshot upload finished (took {:?})" , elapsed) ;
794
- let _ = tokio:: fs:: remove_file ( format ! ( "db.{}" , compression) ) . await ;
809
+ // cleanup gzip database snapshot if exists
810
+ let gzip_path = Self :: db_gzip_path ( & db_path) ;
811
+ let _ = tokio:: fs:: remove_file ( gzip_path) . await ;
795
812
} ) ;
796
813
let elapsed = Instant :: now ( ) - start_ts;
797
814
tracing:: debug!( "Scheduled DB snapshot {} (took {:?})" , generation, elapsed) ;
0 commit comments