@@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
55use  crate :: wal:: WalFileReader ; 
66use  anyhow:: { anyhow,  bail} ; 
77use  arc_swap:: ArcSwapOption ; 
8- use  async_compression:: tokio:: write:: GzipEncoder ; 
8+ use  async_compression:: tokio:: write:: { GzipEncoder ,   ZstdEncoder } ; 
99use  aws_sdk_s3:: config:: { Credentials ,  Region } ; 
1010use  aws_sdk_s3:: error:: SdkError ; 
1111use  aws_sdk_s3:: operation:: get_object:: builders:: GetObjectFluentBuilder ; 
@@ -171,15 +171,15 @@ impl Options {
171171        let  secret_access_key = env_var ( "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY" ) . ok ( ) ; 
172172        let  region = env_var ( "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION" ) . ok ( ) ; 
173173        let  max_frames_per_batch =
174-             env_var_or ( "LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES" ,  500 ) . parse :: < usize > ( ) ?; 
174+             env_var_or ( "LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES" ,  10000 ) . parse :: < usize > ( ) ?; 
175175        let  s3_upload_max_parallelism =
176176            env_var_or ( "LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX" ,  32 ) . parse :: < usize > ( ) ?; 
177177        let  restore_transaction_page_swap_after =
178178            env_var_or ( "LIBSQL_BOTTOMLESS_RESTORE_TXN_SWAP_THRESHOLD" ,  1000 ) . parse :: < u32 > ( ) ?; 
179179        let  restore_transaction_cache_fpath =
180180            env_var_or ( "LIBSQL_BOTTOMLESS_RESTORE_TXN_FILE" ,  ".bottomless.restore" ) ; 
181181        let  use_compression =
182-             CompressionKind :: parse ( & env_var_or ( "LIBSQL_BOTTOMLESS_COMPRESSION" ,  "gz " ) ) 
182+             CompressionKind :: parse ( & env_var_or ( "LIBSQL_BOTTOMLESS_COMPRESSION" ,  "zstd " ) ) 
183183                . map_err ( |e| anyhow ! ( "unknown compression kind: {}" ,  e) ) ?; 
184184        let  verify_crc = match  env_var_or ( "LIBSQL_BOTTOMLESS_VERIFY_CRC" ,  true ) 
185185            . to_lowercase ( ) 
@@ -653,7 +653,7 @@ impl Replicator {
653653            CompressionKind :: None  => Ok ( ByteStream :: from_path ( db_path) . await ?) , 
654654            CompressionKind :: Gzip  => { 
655655                let  mut  reader = File :: open ( db_path) . await ?; 
656-                 let  gzip_path = Self :: db_gzip_path ( db_path) ; 
656+                 let  gzip_path = Self :: db_compressed_path ( db_path,   "gz" ) ; 
657657                let  compressed_file = OpenOptions :: new ( ) 
658658                    . create ( true ) 
659659                    . write ( true ) 
@@ -671,13 +671,33 @@ impl Replicator {
671671                ) ; 
672672                Ok ( ByteStream :: from_path ( gzip_path) . await ?) 
673673            } 
674+             CompressionKind :: Zstd  => { 
675+                 let  mut  reader = File :: open ( db_path) . await ?; 
676+                 let  zstd_path = Self :: db_compressed_path ( db_path,  "zstd" ) ; 
677+                 let  compressed_file = OpenOptions :: new ( ) 
678+                     . create ( true ) 
679+                     . write ( true ) 
680+                     . read ( true ) 
681+                     . truncate ( true ) 
682+                     . open ( & zstd_path) 
683+                     . await ?; 
684+                 let  mut  writer = ZstdEncoder :: new ( compressed_file) ; 
685+                 let  size = tokio:: io:: copy ( & mut  reader,  & mut  writer) . await ?; 
686+                 writer. shutdown ( ) . await ?; 
687+                 tracing:: debug!( 
688+                     "Compressed database file ({} bytes) into `{}`" , 
689+                     size, 
690+                     zstd_path. display( ) 
691+                 ) ; 
692+                 Ok ( ByteStream :: from_path ( zstd_path) . await ?) 
693+             } 
674694        } 
675695    } 
676696
677-     fn  db_gzip_path ( db_path :  & Path )  -> PathBuf  { 
678-         let  mut  gzip_path  = db_path. to_path_buf ( ) ; 
679-         gzip_path . pop ( ) ; 
680-         gzip_path . join ( "db.gz"  ) 
697+     fn  db_compressed_path ( db_path :  & Path ,   suffix :   & ' static   str )  -> PathBuf  { 
698+         let  mut  compressed_path :   PathBuf  = db_path. to_path_buf ( ) ; 
699+         compressed_path . pop ( ) ; 
700+         compressed_path . join ( format ! ( "db.{suffix}"  ) ) 
681701    } 
682702
683703    fn  restore_db_path ( & self )  -> PathBuf  { 
@@ -816,9 +836,10 @@ impl Replicator {
816836            let  _ = snapshot_notifier. send ( Ok ( Some ( generation) ) ) ; 
817837            let  elapsed = Instant :: now ( )  - start; 
818838            tracing:: debug!( "Snapshot upload finished (took {:?})" ,  elapsed) ; 
819-             // cleanup gzip database snapshot if exists 
820-             let  gzip_path = Self :: db_gzip_path ( & db_path) ; 
821-             let  _ = tokio:: fs:: remove_file ( gzip_path) . await ; 
839+             // cleanup gzip/zstd database snapshot if exists 
840+             for  suffix in  & [ "gz" ,  "zstd" ]  { 
841+                 let  _ = tokio:: fs:: remove_file ( Self :: db_compressed_path ( & db_path,  suffix) ) . await ; 
842+             } 
822843        } ) ; 
823844        let  elapsed = Instant :: now ( )  - start_ts; 
824845        tracing:: debug!( "Scheduled DB snapshot {} (took {:?})" ,  generation,  elapsed) ; 
@@ -1160,31 +1181,58 @@ impl Replicator {
11601181    } 
11611182
11621183    async  fn  restore_from_snapshot ( & mut  self ,  generation :  & Uuid ,  db :  & mut  File )  -> Result < bool >  { 
1163-         let  main_db_path = match  self . use_compression  { 
1164-             CompressionKind :: None  => format ! ( "{}-{}/db.db" ,  self . db_name,  generation) , 
1165-             CompressionKind :: Gzip  => format ! ( "{}-{}/db.gz" ,  self . db_name,  generation) , 
1184+         let  algos_to_try = match  self . use_compression  { 
1185+             CompressionKind :: None  => & [ 
1186+                 CompressionKind :: None , 
1187+                 CompressionKind :: Zstd , 
1188+                 CompressionKind :: Gzip , 
1189+             ] , 
1190+             CompressionKind :: Gzip  => & [ 
1191+                 CompressionKind :: Gzip , 
1192+                 CompressionKind :: Zstd , 
1193+                 CompressionKind :: None , 
1194+             ] , 
1195+             CompressionKind :: Zstd  => & [ 
1196+                 CompressionKind :: Zstd , 
1197+                 CompressionKind :: Gzip , 
1198+                 CompressionKind :: None , 
1199+             ] , 
11661200        } ; 
11671201
1168-         if  let  Ok ( db_file)  = self . get_object ( main_db_path) . send ( ) . await  { 
1169-             let  mut  body_reader = db_file. body . into_async_read ( ) ; 
1170-             let  db_size = match  self . use_compression  { 
1171-                 CompressionKind :: None  => tokio:: io:: copy ( & mut  body_reader,  db) . await ?, 
1172-                 CompressionKind :: Gzip  => { 
1173-                     let  mut  decompress_reader = async_compression:: tokio:: bufread:: GzipDecoder :: new ( 
1174-                         tokio:: io:: BufReader :: new ( body_reader) , 
1175-                     ) ; 
1176-                     tokio:: io:: copy ( & mut  decompress_reader,  db) . await ?
1177-                 } 
1202+         for  algo in  algos_to_try { 
1203+             let  main_db_path = match  algo { 
1204+                 CompressionKind :: None  => format ! ( "{}-{}/db.db" ,  self . db_name,  generation) , 
1205+                 CompressionKind :: Gzip  => format ! ( "{}-{}/db.gz" ,  self . db_name,  generation) , 
1206+                 CompressionKind :: Zstd  => format ! ( "{}-{}/db.zstd" ,  self . db_name,  generation) , 
11781207            } ; 
1179-             db. flush ( ) . await ?; 
1208+             if  let  Ok ( db_file)  = self . get_object ( main_db_path) . send ( ) . await  { 
1209+                 let  mut  body_reader = db_file. body . into_async_read ( ) ; 
1210+                 let  db_size = match  algo { 
1211+                     CompressionKind :: None  => tokio:: io:: copy ( & mut  body_reader,  db) . await ?, 
1212+                     CompressionKind :: Gzip  => { 
1213+                         let  mut  decompress_reader =
1214+                             async_compression:: tokio:: bufread:: GzipDecoder :: new ( 
1215+                                 tokio:: io:: BufReader :: new ( body_reader) , 
1216+                             ) ; 
1217+                         tokio:: io:: copy ( & mut  decompress_reader,  db) . await ?
1218+                     } 
1219+                     CompressionKind :: Zstd  => { 
1220+                         let  mut  decompress_reader =
1221+                             async_compression:: tokio:: bufread:: ZstdDecoder :: new ( 
1222+                                 tokio:: io:: BufReader :: new ( body_reader) , 
1223+                             ) ; 
1224+                         tokio:: io:: copy ( & mut  decompress_reader,  db) . await ?
1225+                     } 
1226+                 } ; 
1227+                 db. flush ( ) . await ?; 
11801228
1181-             let  page_size = Self :: read_page_size ( db) . await ?; 
1182-             self . set_page_size ( page_size) ?; 
1183-             tracing:: info!( "Restored the main database file ({} bytes)" ,  db_size) ; 
1184-             Ok ( true ) 
1185-         }  else  { 
1186-             Ok ( false ) 
1229+                 let  page_size = Self :: read_page_size ( db) . await ?; 
1230+                 self . set_page_size ( page_size) ?; 
1231+                 tracing:: info!( "Restored the main database file ({} bytes)" ,  db_size) ; 
1232+                 return  Ok ( true ) ; 
1233+             } 
11871234        } 
1235+         Ok ( false ) 
11881236    } 
11891237
11901238    async  fn  restore_wal ( 
@@ -1235,6 +1283,7 @@ impl Replicator {
12351283                        Some ( result)  => result, 
12361284                        None  => { 
12371285                            if  !key. ends_with ( ".gz" ) 
1286+                                 && !key. ends_with ( ".zstd" ) 
12381287                                && !key. ends_with ( ".db" ) 
12391288                                && !key. ends_with ( ".meta" ) 
12401289                                && !key. ends_with ( ".dep" ) 
@@ -1423,6 +1472,7 @@ impl Replicator {
14231472        let  str = fpath. to_str ( ) ?; 
14241473        if  str. ends_with ( ".db" ) 
14251474            | str. ends_with ( ".gz" ) 
1475+             | str. ends_with ( ".zstd" ) 
14261476            | str. ends_with ( ".raw" ) 
14271477            | str. ends_with ( ".meta" ) 
14281478            | str. ends_with ( ".dep" ) 
@@ -1670,13 +1720,15 @@ pub enum CompressionKind {
16701720    #[ default]  
16711721    None , 
16721722    Gzip , 
1723+     Zstd , 
16731724} 
16741725
16751726impl  CompressionKind  { 
16761727    pub  fn  parse ( kind :  & str )  -> std:: result:: Result < Self ,  & str >  { 
16771728        match  kind { 
16781729            "gz"  | "gzip"  => Ok ( CompressionKind :: Gzip ) , 
16791730            "raw"  | ""  => Ok ( CompressionKind :: None ) , 
1731+             "zstd"  => Ok ( CompressionKind :: Zstd ) , 
16801732            other => Err ( other) , 
16811733        } 
16821734    } 
@@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
16871739        match  self  { 
16881740            CompressionKind :: None  => write ! ( f,  "raw" ) , 
16891741            CompressionKind :: Gzip  => write ! ( f,  "gz" ) , 
1742+             CompressionKind :: Zstd  => write ! ( f,  "zstd" ) , 
16901743        } 
16911744    } 
16921745} 
0 commit comments