Skip to content

Commit

Permalink
async_file_cache: correctly handle panicing in the middle of a file g…
Browse files Browse the repository at this point in the history
…eneration operation
  • Loading branch information
djugei committed May 1, 2024
1 parent e8f1cf0 commit 23d865c
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions async_file_cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::future::Future;
use log::{debug, trace};
use std::ffi::OsString;
use std::{collections::HashMap, hash::Hash, io::ErrorKind, path::PathBuf, pin::pin};

use tokio::io::AsyncSeekExt;
Expand Down Expand Up @@ -36,6 +37,7 @@ where
S: Clone + Send,
KF: Send + Fn(&S, &Key) -> PathBuf,
F: Send + Fn(S, Key, File) -> FF,
// fixme: why does this need to return a file? why not just a result
FF: Future<Output = Result<File, E>> + Send,
E: Send,
{
Expand All @@ -46,7 +48,7 @@ where
* The future should not be blocking either by long calculations or io operations
* use async_runtime::spawn for calculations and async io for io
* f needs to take care of serializing the value to the file on its own
* f must not panic
* f should not panic, but measures are taken to not leave inconsistent state
* max_para: maximum number of expensive operations in flight at the same time
*/
pub fn new(init_state: S, kf: KF, f: F, max_para: Option<usize>) -> Self {
Expand Down Expand Up @@ -123,7 +125,11 @@ where
debug!("generating {:?}", path);
let (tx, rx) = tokio::sync::watch::channel(());
entry.insert(rx);
let w = create.open(&path).await?;

let mut part_path = path.clone();
part_path.as_mut_os_string().push(OsString::from(".part"));

let w = create.open(&part_path).await?;
drop(in_flight);

let perm = if let Some(sem) = &self.max_para {
Expand All @@ -139,25 +145,28 @@ where
let f = (self.f)(self.state.clone(), key.clone(), w);
let f = std::pin::pin!(f);
let f = f.await;
let f = match f {
let _f = match f {
Ok(mut f) => {
let pf = pin!(f.rewind());
pf.await?;
f
}
Err(e) => {
debug!("generating failed, removing {}", path.display());
tokio::fs::remove_file(path).await?;
debug!("generating failed, removing {}", part_path.display());
tokio::fs::remove_file(part_path).await?;
return Ok(Err(e));
}
};
// operation succeeded, we move the .part file to its permanent place
tokio::fs::rename(part_path, &path).await?;
let file = read.open(path).await?;

drop(perm);

// this can not panic as we hold a reciever ourselves
tx.send(()).expect("threading failure (snd)");
self.in_flight.lock().await.remove(&key);
return Ok(Ok(f));
return Ok(Ok(file));
}
}
}
Expand All @@ -176,7 +185,7 @@ fn cache_simple() {
.unwrap();
use futures_util::future::join_all;
let kf = |_: &(), s: &String| PathBuf::from(s);
async fn inner_f(s: (), key: String, mut file: File) -> Result<File, std::io::Error> {
async fn inner_f(_s: (), key: String, mut file: File) -> Result<File, std::io::Error> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
use tokio::io::AsyncWriteExt;
file.write_all(key.as_bytes()).await?;
Expand Down

0 comments on commit 23d865c

Please sign in to comment.