Skip to content

Commit

Permalink
delete tmp file when io error happens
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Sep 18, 2023
1 parent e559749 commit 3bbc660
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,32 +167,59 @@ impl Manifest {
}
}

/// The encoder of the page file in the disk cache.
/// The writer of the page file in the disk cache.
///
/// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is
/// appended.
struct PageFileEncoder {
payload: Bytes,
struct PageFileWriter {
output: String,
}

impl PageFileEncoder {
impl PageFileWriter {
const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e'];

async fn encode_and_persist<W>(&self, writer: &mut W, name: &str) -> Result<()>
where
W: AsyncWrite + std::marker::Unpin,
{
fn new(output: String) -> Self {
Self { output }
}

fn tmp_file(input: &str) -> String {
format!("{}.tmp", input)
}

async fn write_inner(&self, tmp_file: &str, bytes: Bytes) -> Result<()> {
let mut writer = File::create(&tmp_file)
.await
.context(Io { file: tmp_file })?;
writer
.write_all(&self.payload[..])
.write_all(&bytes)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer
.write_all(&Self::MAGIC_FOOTER)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer.flush().await.context(Io { file: tmp_file })?;

tokio::fs::rename(tmp_file, &self.output)
.await
.context(Io { file: &self.output })?;

Ok(())
}

writer.flush().await.context(Io { file: name })?;
// When write bytes to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
async fn write_and_flush(self, bytes: Bytes) -> Result<()> {
let tmp_file = Self::tmp_file(&self.output);
let write_result = self.write_inner(&tmp_file, bytes).await;
if write_result.is_err() {
// we don't care this result.
_ = tokio::fs::remove_file(&tmp_file).await;
}

Ok(())
}
Expand Down Expand Up @@ -262,7 +289,7 @@ impl DiskCache {

async fn insert_data(&self, filename: String, value: Bytes) {
let page_meta = {
let file_size = PageFileEncoder::encoded_size(value.len());
let file_size = PageFileWriter::encoded_size(value.len());
PageMeta { file_size }
};
let evicted_file = self.insert_page_meta(filename.clone(), page_meta);
Expand Down Expand Up @@ -357,34 +384,14 @@ impl DiskCache {
}

async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> {
// When write payload to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
let tmp_filename = format!("{filename}.tmp");
let tmp_filepath = std::path::Path::new(&self.root_dir)
.join(&tmp_filename)
.into_os_string()
.into_string()
.unwrap();

let mut file = File::create(&tmp_filepath).await.context(Io {
file: &tmp_filepath,
})?;

let encoding = PageFileEncoder { payload };
encoding
.encode_and_persist(&mut file, &tmp_filename)
.await?;

let dest_filepath = std::path::Path::new(&self.root_dir)
.join(filename)
.into_os_string()
.into_string()
.unwrap();
tokio::fs::rename(tmp_filepath, dest_filepath)
.await
.context(Io { file: filename })?;

let writer = PageFileWriter::new(dest_filepath);
writer.write_and_flush(payload).await?;

Ok(())
}
Expand All @@ -399,7 +406,7 @@ impl DiskCache {
range: &Range<usize>,
expect_file_size: usize,
) -> std::io::Result<ReadBytesResult> {
if PageFileEncoder::encoded_size(range.len()) > expect_file_size {
if PageFileWriter::encoded_size(range.len()) > expect_file_size {
return Ok(ReadBytesResult::OutOfRange);
}

Expand Down

0 comments on commit 3bbc660

Please sign in to comment.