diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index b41a4b38db..d1deb623ea 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -357,18 +357,36 @@ impl DiskCache { } async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> { - let file_path = std::path::Path::new(&self.root_dir) - .join(filename) + // When write payload to file, the cache lock is released, so when one thread is + // reading, another thread may update it, so we write to tmp file first, + // then rename to expected filename to avoid other threads see partial + // content. + let tmp_filename = format!("{filename}.tmp"); + let tmp_filepath = std::path::Path::new(&self.root_dir) + .join(&tmp_filename) .into_os_string() .into_string() .unwrap(); - let mut file = File::create(&file_path) - .await - .context(Io { file: &file_path })?; + let mut file = File::create(&tmp_filepath).await.context(Io { + file: &tmp_filepath, + })?; let encoding = PageFileEncoder { payload }; - encoding.encode_and_persist(&mut file, filename).await + encoding + .encode_and_persist(&mut file, &tmp_filename) + .await?; + + let dest_filepath = std::path::Path::new(&self.root_dir) + .join(filename) + .into_os_string() + .into_string() + .unwrap(); + tokio::fs::rename(tmp_filepath, dest_filepath) + .await + .context(Io { file: filename })?; + + Ok(()) } /// Read the bytes from the cached file. @@ -1103,7 +1121,6 @@ mod test { } let actual = futures::future::join_all(tasks).await; - println!("get_counts, {}", store.inner.underlying_store); for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) { assert_eq!(actual.unwrap(), Bytes::from(expected)) }