Skip to content

Commit

Permalink
Merge pull request #1817 from tursodatabase/optimize-wal-push
Browse files Browse the repository at this point in the history
libsql: Improve WAL frame push logic
  • Loading branch information
LucioFranco authored Nov 14, 2024
2 parents 5ee4723 + 37bc47d commit 89b3460
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
3 changes: 2 additions & 1 deletion libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fallible-iterator = { version = "0.3", optional = true }

libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls", "json" ], optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
Expand Down Expand Up @@ -106,6 +106,7 @@ sync = [
"dep:tokio",
"dep:futures",
"dep:reqwest",
"dep:serde_json",
]
hrana = [
"parser",
Expand Down
24 changes: 17 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,16 @@ impl Database {
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;

for frame_no in start_frame_no..end_frame_no+1 {
self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -409,7 +417,7 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<()> {
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
let frame = vec![0; frame_size];
let rc = unsafe {
Expand All @@ -419,12 +427,12 @@ impl Database {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(())
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<()> {
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
Expand All @@ -437,7 +445,9 @@ impl Database {
}
let res = builder.body(frame.to_vec()).send().await.unwrap();
if res.status().is_success() {
return Ok(());
let resp = res.json::<serde_json::Value>().await.unwrap();
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
Expand Down

0 comments on commit 89b3460

Please sign in to comment.