Skip to content

Commit

Permalink
libsql: Improve WAL frame push logic
Browse files Browse the repository at this point in the history
The server returns its maximum frame number so let's use that as a hint
to avoid pushing frames it already knows about. IOW, let's start by
sending the first frame in the WAL, but immediately skip to a the
server's max frame number. Later we should also cache the server max
frame number on client side to make the first push start from the right
frame.
  • Loading branch information
penberg committed Nov 14, 2024
1 parent 4e5ca0f commit ddd1c89
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fallible-iterator = { version = "0.3", optional = true }

libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls", "json" ], optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
Expand Down
24 changes: 17 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,16 @@ impl Database {
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;

for frame_no in start_frame_no..end_frame_no+1 {
self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -409,7 +417,7 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<()> {
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
let frame = vec![0; frame_size];
let rc = unsafe {
Expand All @@ -419,12 +427,12 @@ impl Database {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(())
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<()> {
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
Expand All @@ -437,7 +445,9 @@ impl Database {
}
let res = builder.body(frame.to_vec()).send().await.unwrap();
if res.status().is_success() {
return Ok(());
let resp = res.json::<serde_json::Value>().await.unwrap();
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
Expand Down

0 comments on commit ddd1c89

Please sign in to comment.