Skip to content

Commit

Permalink
first write to tmp file when persist bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Sep 15, 2023
1 parent 8d5d761 commit e559749
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,36 @@ impl DiskCache {
}

async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> {
let file_path = std::path::Path::new(&self.root_dir)
.join(filename)
// When write payload to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
let tmp_filename = format!("{filename}.tmp");
let tmp_filepath = std::path::Path::new(&self.root_dir)
.join(&tmp_filename)
.into_os_string()
.into_string()
.unwrap();

let mut file = File::create(&file_path)
.await
.context(Io { file: &file_path })?;
let mut file = File::create(&tmp_filepath).await.context(Io {
file: &tmp_filepath,
})?;

let encoding = PageFileEncoder { payload };
encoding.encode_and_persist(&mut file, filename).await
encoding
.encode_and_persist(&mut file, &tmp_filename)
.await?;

let dest_filepath = std::path::Path::new(&self.root_dir)
.join(filename)
.into_os_string()
.into_string()
.unwrap();
tokio::fs::rename(tmp_filepath, dest_filepath)
.await
.context(Io { file: filename })?;

Ok(())
}

/// Read the bytes from the cached file.
Expand Down Expand Up @@ -1103,7 +1121,6 @@ mod test {
}

let actual = futures::future::join_all(tasks).await;
println!("get_counts, {}", store.inner.underlying_store);
for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) {
assert_eq!(actual.unwrap(), Bytes::from(expected))
}
Expand Down

0 comments on commit e559749

Please sign in to comment.