@@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
55use crate :: wal:: WalFileReader ;
66use anyhow:: { anyhow, bail} ;
77use arc_swap:: ArcSwapOption ;
8- use async_compression:: tokio:: write:: GzipEncoder ;
8+ use async_compression:: tokio:: write:: { GzipEncoder , XzEncoder } ;
99use aws_sdk_s3:: config:: { Credentials , Region } ;
1010use aws_sdk_s3:: error:: SdkError ;
1111use aws_sdk_s3:: operation:: get_object:: builders:: GetObjectFluentBuilder ;
@@ -171,7 +171,7 @@ impl Options {
171171 let secret_access_key = env_var ( "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY" ) . ok ( ) ;
172172 let region = env_var ( "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION" ) . ok ( ) ;
173173 let max_frames_per_batch =
174- env_var_or ( "LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES" , 500 ) . parse :: < usize > ( ) ?;
174+ env_var_or ( "LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES" , 10000 ) . parse :: < usize > ( ) ?;
175175 let s3_upload_max_parallelism =
176176 env_var_or ( "LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX" , 32 ) . parse :: < usize > ( ) ?;
177177 let restore_transaction_page_swap_after =
@@ -653,7 +653,7 @@ impl Replicator {
653653 CompressionKind :: None => Ok ( ByteStream :: from_path ( db_path) . await ?) ,
654654 CompressionKind :: Gzip => {
655655 let mut reader = File :: open ( db_path) . await ?;
656- let gzip_path = Self :: db_gzip_path ( db_path) ;
656+ let gzip_path = Self :: db_compressed_path ( db_path, "gz" ) ;
657657 let compressed_file = OpenOptions :: new ( )
658658 . create ( true )
659659 . write ( true )
@@ -671,13 +671,33 @@ impl Replicator {
671671 ) ;
672672 Ok ( ByteStream :: from_path ( gzip_path) . await ?)
673673 }
674+ CompressionKind :: Xz => {
675+ let mut reader = File :: open ( db_path) . await ?;
676+ let xz_path = Self :: db_compressed_path ( db_path, "xz" ) ;
677+ let compressed_file = OpenOptions :: new ( )
678+ . create ( true )
679+ . write ( true )
680+ . read ( true )
681+ . truncate ( true )
682+ . open ( & xz_path)
683+ . await ?;
684+ let mut writer = XzEncoder :: new ( compressed_file) ;
685+ let size = tokio:: io:: copy ( & mut reader, & mut writer) . await ?;
686+ writer. shutdown ( ) . await ?;
687+ tracing:: debug!(
688+ "Compressed database file ({} bytes) into `{}`" ,
689+ size,
690+ xz_path. display( )
691+ ) ;
692+ Ok ( ByteStream :: from_path ( xz_path) . await ?)
693+ }
674694 }
675695 }
676696
677- fn db_gzip_path ( db_path : & Path ) -> PathBuf {
678- let mut gzip_path = db_path. to_path_buf ( ) ;
679- gzip_path . pop ( ) ;
680- gzip_path . join ( "db.gz" )
697+ fn db_compressed_path ( db_path : & Path , suffix : & ' static str ) -> PathBuf {
698+ let mut compressed_path : PathBuf = db_path. to_path_buf ( ) ;
699+ compressed_path . pop ( ) ;
700+ compressed_path . join ( format ! ( "db.{suffix}" ) )
681701 }
682702
683703 fn restore_db_path ( & self ) -> PathBuf {
@@ -816,9 +836,10 @@ impl Replicator {
816836 let _ = snapshot_notifier. send ( Ok ( Some ( generation) ) ) ;
817837 let elapsed = Instant :: now ( ) - start;
818838 tracing:: debug!( "Snapshot upload finished (took {:?})" , elapsed) ;
819- // cleanup gzip database snapshot if exists
820- let gzip_path = Self :: db_gzip_path ( & db_path) ;
821- let _ = tokio:: fs:: remove_file ( gzip_path) . await ;
839+ // cleanup gzip/xz database snapshot if exists
840+ for suffix in & [ "gz" , "xz" ] {
841+ let _ = tokio:: fs:: remove_file ( Self :: db_compressed_path ( & db_path, suffix) ) . await ;
842+ }
822843 } ) ;
823844 let elapsed = Instant :: now ( ) - start_ts;
824845 tracing:: debug!( "Scheduled DB snapshot {} (took {:?})" , generation, elapsed) ;
@@ -1160,31 +1181,58 @@ impl Replicator {
11601181 }
11611182
11621183 async fn restore_from_snapshot ( & mut self , generation : & Uuid , db : & mut File ) -> Result < bool > {
1163- let main_db_path = match self . use_compression {
1164- CompressionKind :: None => format ! ( "{}-{}/db.db" , self . db_name, generation) ,
1165- CompressionKind :: Gzip => format ! ( "{}-{}/db.gz" , self . db_name, generation) ,
1184+ let algos_to_try = match self . use_compression {
1185+ CompressionKind :: None => & [
1186+ CompressionKind :: None ,
1187+ CompressionKind :: Xz ,
1188+ CompressionKind :: Gzip ,
1189+ ] ,
1190+ CompressionKind :: Gzip => & [
1191+ CompressionKind :: Gzip ,
1192+ CompressionKind :: Xz ,
1193+ CompressionKind :: None ,
1194+ ] ,
1195+ CompressionKind :: Xz => & [
1196+ CompressionKind :: Xz ,
1197+ CompressionKind :: Gzip ,
1198+ CompressionKind :: None ,
1199+ ] ,
11661200 } ;
11671201
1168- if let Ok ( db_file) = self . get_object ( main_db_path) . send ( ) . await {
1169- let mut body_reader = db_file. body . into_async_read ( ) ;
1170- let db_size = match self . use_compression {
1171- CompressionKind :: None => tokio:: io:: copy ( & mut body_reader, db) . await ?,
1172- CompressionKind :: Gzip => {
1173- let mut decompress_reader = async_compression:: tokio:: bufread:: GzipDecoder :: new (
1174- tokio:: io:: BufReader :: new ( body_reader) ,
1175- ) ;
1176- tokio:: io:: copy ( & mut decompress_reader, db) . await ?
1177- }
1202+ for algo in algos_to_try {
1203+ let main_db_path = match algo {
1204+ CompressionKind :: None => format ! ( "{}-{}/db.db" , self . db_name, generation) ,
1205+ CompressionKind :: Gzip => format ! ( "{}-{}/db.gz" , self . db_name, generation) ,
1206+ CompressionKind :: Xz => format ! ( "{}-{}/db.xz" , self . db_name, generation) ,
11781207 } ;
1179- db. flush ( ) . await ?;
1208+ if let Ok ( db_file) = self . get_object ( main_db_path) . send ( ) . await {
1209+ let mut body_reader = db_file. body . into_async_read ( ) ;
1210+ let db_size = match algo {
1211+ CompressionKind :: None => tokio:: io:: copy ( & mut body_reader, db) . await ?,
1212+ CompressionKind :: Gzip => {
1213+ let mut decompress_reader =
1214+ async_compression:: tokio:: bufread:: GzipDecoder :: new (
1215+ tokio:: io:: BufReader :: new ( body_reader) ,
1216+ ) ;
1217+ tokio:: io:: copy ( & mut decompress_reader, db) . await ?
1218+ }
1219+ CompressionKind :: Xz => {
1220+ let mut decompress_reader =
1221+ async_compression:: tokio:: bufread:: XzDecoder :: new (
1222+ tokio:: io:: BufReader :: new ( body_reader) ,
1223+ ) ;
1224+ tokio:: io:: copy ( & mut decompress_reader, db) . await ?
1225+ }
1226+ } ;
1227+ db. flush ( ) . await ?;
11801228
1181- let page_size = Self :: read_page_size ( db) . await ?;
1182- self . set_page_size ( page_size) ?;
1183- tracing:: info!( "Restored the main database file ({} bytes)" , db_size) ;
1184- Ok ( true )
1185- } else {
1186- Ok ( false )
1229+ let page_size = Self :: read_page_size ( db) . await ?;
1230+ self . set_page_size ( page_size) ?;
1231+ tracing:: info!( "Restored the main database file ({} bytes)" , db_size) ;
1232+ return Ok ( true ) ;
1233+ }
11871234 }
1235+ Ok ( false )
11881236 }
11891237
11901238 async fn restore_wal (
@@ -1235,6 +1283,7 @@ impl Replicator {
12351283 Some ( result) => result,
12361284 None => {
12371285 if !key. ends_with ( ".gz" )
1286+ && !key. ends_with ( ".xz" )
12381287 && !key. ends_with ( ".db" )
12391288 && !key. ends_with ( ".meta" )
12401289 && !key. ends_with ( ".dep" )
@@ -1423,6 +1472,7 @@ impl Replicator {
14231472 let str = fpath. to_str ( ) ?;
14241473 if str. ends_with ( ".db" )
14251474 | str. ends_with ( ".gz" )
1475+ | str. ends_with ( ".xz" )
14261476 | str. ends_with ( ".raw" )
14271477 | str. ends_with ( ".meta" )
14281478 | str. ends_with ( ".dep" )
@@ -1670,13 +1720,15 @@ pub enum CompressionKind {
16701720 #[ default]
16711721 None ,
16721722 Gzip ,
1723+ Xz ,
16731724}
16741725
16751726impl CompressionKind {
16761727 pub fn parse ( kind : & str ) -> std:: result:: Result < Self , & str > {
16771728 match kind {
16781729 "gz" | "gzip" => Ok ( CompressionKind :: Gzip ) ,
16791730 "raw" | "" => Ok ( CompressionKind :: None ) ,
1731+ "xz" => Ok ( CompressionKind :: Xz ) ,
16801732 other => Err ( other) ,
16811733 }
16821734 }
@@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
16871739 match self {
16881740 CompressionKind :: None => write ! ( f, "raw" ) ,
16891741 CompressionKind :: Gzip => write ! ( f, "gz" ) ,
1742+ CompressionKind :: Xz => write ! ( f, "xz" ) ,
16901743 }
16911744 }
16921745}
0 commit comments