@@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
55use  crate :: wal:: WalFileReader ; 
66use  anyhow:: { anyhow,  bail} ; 
77use  arc_swap:: ArcSwapOption ; 
8- use  async_compression:: tokio:: write:: GzipEncoder ; 
8+ use  async_compression:: tokio:: write:: { GzipEncoder ,   XzEncoder } ; 
99use  aws_sdk_s3:: config:: { Credentials ,  Region } ; 
1010use  aws_sdk_s3:: error:: SdkError ; 
1111use  aws_sdk_s3:: operation:: get_object:: builders:: GetObjectFluentBuilder ; 
@@ -653,7 +653,7 @@ impl Replicator {
653653            CompressionKind :: None  => Ok ( ByteStream :: from_path ( db_path) . await ?) , 
654654            CompressionKind :: Gzip  => { 
655655                let  mut  reader = File :: open ( db_path) . await ?; 
656-                 let  gzip_path = Self :: db_gzip_path ( db_path) ; 
656+                 let  gzip_path = Self :: db_compressed_path ( db_path,   "gz" ) ; 
657657                let  compressed_file = OpenOptions :: new ( ) 
658658                    . create ( true ) 
659659                    . write ( true ) 
@@ -671,13 +671,34 @@ impl Replicator {
671671                ) ; 
672672                Ok ( ByteStream :: from_path ( gzip_path) . await ?) 
673673            } 
674+             CompressionKind :: Xz  => { 
675+                 let  mut  reader = File :: open ( db_path) . await ?; 
676+                 let  gzip_path = Self :: db_compressed_path ( db_path,  "xz" ) ; 
677+                 let  compressed_file = OpenOptions :: new ( ) 
678+                     . create ( true ) 
679+                     . write ( true ) 
680+                     . read ( true ) 
681+                     . truncate ( true ) 
682+                     . open ( & gzip_path) 
683+                     . await ?; 
684+                 let  mut  writer =
685+                     XzEncoder :: with_quality ( compressed_file,  async_compression:: Level :: Best ) ; 
686+                 let  size = tokio:: io:: copy ( & mut  reader,  & mut  writer) . await ?; 
687+                 writer. shutdown ( ) . await ?; 
688+                 tracing:: debug!( 
689+                     "Compressed database file ({} bytes) into `{}`" , 
690+                     size, 
691+                     gzip_path. display( ) 
692+                 ) ; 
693+                 Ok ( ByteStream :: from_path ( gzip_path) . await ?) 
694+             } 
674695        } 
675696    } 
676697
677-     fn  db_gzip_path ( db_path :  & Path )  -> PathBuf  { 
678-         let  mut  gzip_path  = db_path. to_path_buf ( ) ; 
679-         gzip_path . pop ( ) ; 
680-         gzip_path . join ( "db.gz"  ) 
698+     fn  db_compressed_path ( db_path :  & Path ,   suffix :   & ' static   str )  -> PathBuf  { 
699+         let  mut  compressed_path :   PathBuf  = db_path. to_path_buf ( ) ; 
700+         compressed_path . pop ( ) ; 
701+         compressed_path . join ( format ! ( "db.{suffix}"  ) ) 
681702    } 
682703
683704    fn  restore_db_path ( & self )  -> PathBuf  { 
@@ -816,9 +837,10 @@ impl Replicator {
816837            let  _ = snapshot_notifier. send ( Ok ( Some ( generation) ) ) ; 
817838            let  elapsed = Instant :: now ( )  - start; 
818839            tracing:: debug!( "Snapshot upload finished (took {:?})" ,  elapsed) ; 
819-             // cleanup gzip database snapshot if exists 
820-             let  gzip_path = Self :: db_gzip_path ( & db_path) ; 
821-             let  _ = tokio:: fs:: remove_file ( gzip_path) . await ; 
840+             // cleanup gzip/xz database snapshot if exists 
841+             for  suffix in  & [ "gz" ,  "xz" ]  { 
842+                 let  _ = tokio:: fs:: remove_file ( Self :: db_compressed_path ( & db_path,  suffix) ) . await ; 
843+             } 
822844        } ) ; 
823845        let  elapsed = Instant :: now ( )  - start_ts; 
824846        tracing:: debug!( "Scheduled DB snapshot {} (took {:?})" ,  generation,  elapsed) ; 
@@ -1163,6 +1185,7 @@ impl Replicator {
11631185        let  main_db_path = match  self . use_compression  { 
11641186            CompressionKind :: None  => format ! ( "{}-{}/db.db" ,  self . db_name,  generation) , 
11651187            CompressionKind :: Gzip  => format ! ( "{}-{}/db.gz" ,  self . db_name,  generation) , 
1188+             CompressionKind :: Xz  => format ! ( "{}-{}/db.xz" ,  self . db_name,  generation) , 
11661189        } ; 
11671190
11681191        if  let  Ok ( db_file)  = self . get_object ( main_db_path) . send ( ) . await  { 
@@ -1175,6 +1198,12 @@ impl Replicator {
11751198                    ) ; 
11761199                    tokio:: io:: copy ( & mut  decompress_reader,  db) . await ?
11771200                } 
1201+                 CompressionKind :: Xz  => { 
1202+                     let  mut  decompress_reader = async_compression:: tokio:: bufread:: XzDecoder :: new ( 
1203+                         tokio:: io:: BufReader :: new ( body_reader) , 
1204+                     ) ; 
1205+                     tokio:: io:: copy ( & mut  decompress_reader,  db) . await ?
1206+                 } 
11781207            } ; 
11791208            db. flush ( ) . await ?; 
11801209
@@ -1235,6 +1264,7 @@ impl Replicator {
12351264                        Some ( result)  => result, 
12361265                        None  => { 
12371266                            if  !key. ends_with ( ".gz" ) 
1267+                                 && !key. ends_with ( ".xz" ) 
12381268                                && !key. ends_with ( ".db" ) 
12391269                                && !key. ends_with ( ".meta" ) 
12401270                                && !key. ends_with ( ".dep" ) 
@@ -1423,6 +1453,7 @@ impl Replicator {
14231453        let  str = fpath. to_str ( ) ?; 
14241454        if  str. ends_with ( ".db" ) 
14251455            | str. ends_with ( ".gz" ) 
1456+             | str. ends_with ( ".xz" ) 
14261457            | str. ends_with ( ".raw" ) 
14271458            | str. ends_with ( ".meta" ) 
14281459            | str. ends_with ( ".dep" ) 
@@ -1670,13 +1701,15 @@ pub enum CompressionKind {
16701701    #[ default]  
16711702    None , 
16721703    Gzip , 
1704+     Xz , 
16731705} 
16741706
16751707impl  CompressionKind  { 
16761708    pub  fn  parse ( kind :  & str )  -> std:: result:: Result < Self ,  & str >  { 
16771709        match  kind { 
16781710            "gz"  | "gzip"  => Ok ( CompressionKind :: Gzip ) , 
16791711            "raw"  | ""  => Ok ( CompressionKind :: None ) , 
1712+             "xz"  => Ok ( CompressionKind :: Xz ) , 
16801713            other => Err ( other) , 
16811714        } 
16821715    } 
@@ -1687,6 +1720,7 @@ impl std::fmt::Display for CompressionKind {
16871720        match  self  { 
16881721            CompressionKind :: None  => write ! ( f,  "raw" ) , 
16891722            CompressionKind :: Gzip  => write ! ( f,  "gz" ) , 
1723+             CompressionKind :: Xz  => write ! ( f,  "xz" ) , 
16901724        } 
16911725    } 
16921726} 
0 commit comments