Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ext/pytests/test_secondary_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def folder_pairs(folder):
filename_parts = (f.stem.split("_") for f in folder.glob("*"))
filename_parts = (f.stem.split("_") for f in folder.glob("**/*") if f.is_file())
return (
(int(split[0].replace("m", "-")), int(split[1])) for split in filename_parts
)
Expand Down
8 changes: 7 additions & 1 deletion ext/src/chain_complex/chain_homotopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ impl<
&& !right.name().is_empty()
{
let mut save_dir = left.source.save_dir().clone();
save_dir.push(format!("massey/{},{}/", left.name(), right.name()));
save_dir.push(format!(
"massey/{left_s}/{right_s}/{},{}/",
left.name(),
right.name(),
left_s = left.shift.s(),
right_s = right.shift.s(),
));

SaveKind::ChainHomotopy
.create_dir(save_dir.write().unwrap())
Expand Down
2 changes: 1 addition & 1 deletion ext/src/resolution_homomorphism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
pub fn new(name: String, source: Arc<CC1>, target: Arc<CC2>, shift: Bidegree) -> Self {
let save_dir = if source.save_dir().is_some() && !name.is_empty() {
let mut save_dir = source.save_dir().clone();
save_dir.push(format!("products/{name}"));
save_dir.push(format!("products/{shift_s}/{name}", shift_s = shift.s()));
SaveKind::ChainMap
.create_dir(save_dir.write().unwrap())
.unwrap();
Expand Down
84 changes: 65 additions & 19 deletions ext/src/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ impl From<Option<PathBuf>> for SaveDirectory {
}
}

/// A DashSet<PathBuf>> of files that are currently opened and being written to. When calling this
/// function for the first time, we set the ctrlc handler to delete currently opened files then
/// exit.
fn open_files() -> &'static Mutex<HashSet<PathBuf>> {
/// A `DashSet<PathBuf>>` of paths that are currently being used.
///
/// Suppose a path `p` is contained in this `DashSet`.
/// - If `p` points to a file, then a (unique) thread is currently writing to that file. The ctrlc
/// handler ensures that any such file will be deleted if the program is terminated.
/// - If `p` points to a directory, then a thread is in the process of creating a file in `p`.
fn paths_in_use() -> &'static Mutex<HashSet<PathBuf>> {
static OPEN_FILES: LazyLock<Mutex<HashSet<PathBuf>>> = LazyLock::new(|| {
#[cfg(unix)]
ctrlc::set_handler(move || {
tracing::warn!("Ctrl-C detected. Deleting open files and exiting.");
let files = open_files().lock().unwrap();
for file in &*files {
let paths = paths_in_use().lock().unwrap();
for file in paths.iter().filter(|p| p.is_file()) {
std::fs::remove_file(file)
.unwrap_or_else(|_| panic!("Error when deleting {file:?}"));
tracing::warn!(?file, "deleted");
Expand Down Expand Up @@ -239,7 +242,7 @@ impl<T: io::Write> std::ops::Drop for ChecksumWriter<T> {
.unwrap();
self.writer.flush().unwrap();
assert!(
open_files().lock().unwrap().remove(&self.path),
paths_in_use().lock().unwrap().remove(&self.path),
"File {:?} already dropped",
self.path
);
Expand Down Expand Up @@ -396,31 +399,46 @@ impl<A: Algebra> SaveFile<A> {
Ok(())
}

fn get_save_directory(&self, mut dir: PathBuf) -> PathBuf {
dir.push(format!(
"{name}s/{s}/",
name = self.kind.name(),
s = self.b.s()
));
dir
}

/// This panics if there is no save dir
fn get_save_path(&self, mut dir: PathBuf) -> PathBuf {
fn add_save_path(&self, mut dir: PathBuf) -> PathBuf {
let n = if self.b.n() < 0 {
format!("m{}", -self.b.n())
} else {
self.b.n().to_string()
};
if let Some(idx) = self.idx {
dir.push(format!(
"{name}s/{n}_{s}_{idx}_{name}",
"{n}_{s}_{idx}_{name}",
name = self.kind.name(),
s = self.b.s(),
));
} else {
dir.push(format!(
"{name}s/{n}_{s}_{name}",
"{n}_{s}_{name}",
name = self.kind.name(),
s = self.b.s(),
));
}
dir
}

fn get_full_save_path(&self, mut dir: PathBuf) -> PathBuf {
dir = self.get_save_directory(dir);
dir = self.add_save_path(dir);
dir
}

pub fn open_file(&self, dir: PathBuf) -> Option<Box<dyn io::Read>> {
let file_path = self.get_save_path(dir);
let file_path = self.get_full_save_path(dir);
let path_string = file_path.to_string_lossy().into_owned();
if let Some(mut f) = open_file(file_path) {
self.validate_header(&mut f).unwrap();
Expand All @@ -433,7 +451,8 @@ impl<A: Algebra> SaveFile<A> {
}

pub fn exists(&self, dir: PathBuf) -> bool {
let path = self.get_save_path(dir);
#[allow(unused_mut)]
let mut path = self.get_full_save_path(dir);
if path.exists() {
return true;
}
Expand All @@ -449,28 +468,49 @@ impl<A: Algebra> SaveFile<A> {
}

pub fn delete_file(&self, dir: PathBuf) -> io::Result<()> {
let p = self.get_save_path(dir);
match std::fs::remove_file(p) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
let dir = self.get_save_directory(dir);
let p = self.add_save_path(dir.clone());
if let Err(e) = std::fs::remove_file(p) {
if e.kind() != io::ErrorKind::NotFound {
return Err(e);
}
};
// We only delete the directory if no thread is attempting to write to it.
if !paths_in_use().lock().unwrap().contains(&dir) {
let harmless_errors = [io::ErrorKind::DirectoryNotEmpty, io::ErrorKind::NotFound];
// `remove_dir` only deletes empty directories, so this is safe.
match std::fs::remove_dir(dir) {
Err(e) if harmless_errors.contains(&e.kind()) => Ok(()),
x => x,
}?;
}
Ok(())
}

/// # Arguments
/// - `overwrite`: Whether to overwrite a file if it already exists.
pub fn create_file(&self, dir: PathBuf, overwrite: bool) -> impl io::Write {
let p = self.get_save_path(dir);
let dir = self.get_save_directory(dir);
let p = self.add_save_path(dir.clone());
tracing::info!(file = ?p, "open for writing");

// We need to do this before creating any file. The ctrlc handler does not block other threads
// from running, but it does lock [`open_files()`]. So this ensures we do not open new files
// while handling ctrlc.
assert!(
open_files().lock().unwrap().insert(p.clone()),
paths_in_use().lock().unwrap().insert(p.clone()),
"File {p:?} is already opened"
);

// We also add the directory to the set of paths in use. This is to ensure that we only
// delete directories when no thread is attempting to write to a file in that directory. We
// don't hold the mutex for the entirety of this function to guard it from getting poisoned.
paths_in_use().lock().unwrap().insert(dir.clone());

std::fs::create_dir_all(&dir)
.with_context(|| format!("Failed to create directories containing {p:?}"))
.unwrap();

let f = std::fs::OpenOptions::new()
.write(true)
.create_new(!overwrite)
Expand All @@ -479,6 +519,12 @@ impl<A: Algebra> SaveFile<A> {
.open(&p)
.with_context(|| format!("Failed to create save file {p:?}"))
.unwrap();

// We have successfully created the file, so `dir` is nonempty, and calling
// `std::fs::delete_dir(dir)` will have no effect. Therefore, we can remove `dir` from the
// set of paths in use.
paths_in_use().lock().unwrap().remove(&dir);

let mut f = ChecksumWriter::new(p, io::BufWriter::new(f));
self.write_header(&mut f).unwrap();
f
Expand Down
2 changes: 1 addition & 1 deletion ext/tests/save_load_resolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ fn test_checksum() {
.compute_through_bidegree(Bidegree::s_t(2, 2));

let mut path = tempdir.path().to_owned();
path.push("differentials/0_2_differential");
path.push("differentials/2/0_2_differential");

let mut file = OpenOptions::new()
.read(true)
Expand Down