Skip to content

Commit fecce97

Browse files
authored
Lazy TempDir creation in DiskManager (#1695)
1 parent a7f0156 commit fecce97

File tree

1 file changed

+73
-26
lines changed

1 file changed

+73
-26
lines changed

datafusion/src/execution/disk_manager.rs

Lines changed: 73 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
use crate::error::{DataFusionError, Result};
2222
use log::debug;
2323
use rand::{thread_rng, Rng};
24-
use std::path::PathBuf;
2524
use std::sync::Arc;
25+
use std::{path::PathBuf, sync::Mutex};
2626
use tempfile::{Builder, NamedTempFile, TempDir};
2727

2828
/// Configuration for temporary disk access
@@ -67,39 +67,49 @@ impl DiskManagerConfig {
6767
/// while processing dataset larger than available memory.
6868
#[derive(Debug)]
6969
pub struct DiskManager {
70-
local_dirs: Vec<TempDir>,
70+
/// TempDirs to put temporary files in. A new OS specified
71+
/// temporary directory will be created if this list is empty.
72+
local_dirs: Mutex<Vec<TempDir>>,
7173
}
7274

7375
impl DiskManager {
7476
/// Create a DiskManager given the configuration
7577
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
7678
match config {
7779
DiskManagerConfig::Existing(manager) => Ok(manager),
78-
DiskManagerConfig::NewOs => {
79-
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
80-
81-
debug!(
82-
"Created directory {:?} as DataFusion working directory",
83-
tempdir
84-
);
85-
Ok(Arc::new(Self {
86-
local_dirs: vec![tempdir],
87-
}))
88-
}
80+
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
81+
local_dirs: Mutex::new(vec![]),
82+
})),
8983
DiskManagerConfig::NewSpecified(conf_dirs) => {
9084
let local_dirs = create_local_dirs(conf_dirs)?;
9185
debug!(
9286
"Created local dirs {:?} as DataFusion working directory",
9387
local_dirs
9488
);
95-
Ok(Arc::new(Self { local_dirs }))
89+
Ok(Arc::new(Self {
90+
local_dirs: Mutex::new(local_dirs),
91+
}))
9692
}
9793
}
9894
}
9995

10096
/// Return a temporary file from a randomized choice in the configured locations
10197
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
102-
create_tmp_file(&self.local_dirs)
98+
let mut local_dirs = self.local_dirs.lock().unwrap();
99+
100+
// Create a temporary directory if needed
101+
if local_dirs.is_empty() {
102+
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
103+
104+
debug!(
105+
"Created directory '{:?}' as DataFusion tempfile directory",
106+
tempdir.path().to_string_lossy()
107+
);
108+
109+
local_dirs.push(tempdir);
110+
}
111+
112+
create_tmp_file(&local_dirs)
103113
}
104114
}
105115

@@ -129,10 +139,42 @@ fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
129139

130140
#[cfg(test)]
131141
mod tests {
142+
use std::path::Path;
143+
132144
use super::*;
133145
use crate::error::Result;
134146
use tempfile::TempDir;
135147

148+
#[test]
149+
fn lazy_temp_dir_creation() -> Result<()> {
150+
// A default configuration should not create temp files until requested
151+
let config = DiskManagerConfig::new();
152+
let dm = DiskManager::try_new(config)?;
153+
154+
assert_eq!(0, local_dir_snapshot(&dm).len());
155+
156+
// can still create a tempfile however:
157+
let actual = dm.create_tmp_file()?;
158+
159+
// Now the tempdir has been created on demand
160+
assert_eq!(1, local_dir_snapshot(&dm).len());
161+
162+
// the returned tempfile file should be in the temp directory
163+
let local_dirs = local_dir_snapshot(&dm);
164+
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
165+
166+
Ok(())
167+
}
168+
169+
fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
170+
dm.local_dirs
171+
.lock()
172+
.unwrap()
173+
.iter()
174+
.map(|p| p.path().into())
175+
.collect()
176+
}
177+
136178
#[test]
137179
fn file_in_right_dir() -> Result<()> {
138180
let local_dir1 = TempDir::new()?;
@@ -147,19 +189,24 @@ mod tests {
147189
let actual = dm.create_tmp_file()?;
148190

149191
// the file should be in one of the specified local directories
150-
let found = local_dirs.iter().any(|p| {
151-
actual
152-
.path()
192+
assert_path_in_dirs(actual.path(), local_dirs.into_iter());
193+
194+
Ok(())
195+
}
196+
197+
/// Asserts that `file_path` is found anywhere in any of `dir` directories
198+
fn assert_path_in_dirs<'a>(
199+
file_path: &'a Path,
200+
dirs: impl Iterator<Item = &'a Path>,
201+
) {
202+
let dirs: Vec<&Path> = dirs.collect();
203+
204+
let found = dirs.iter().any(|file_path| {
205+
file_path
153206
.ancestors()
154-
.any(|candidate_path| *p == candidate_path)
207+
.any(|candidate_path| *file_path == candidate_path)
155208
});
156209

157-
assert!(
158-
found,
159-
"Can't find {:?} in specified local dirs: {:?}",
160-
actual, local_dirs
161-
);
162-
163-
Ok(())
210+
assert!(found, "Can't find {:?} in dirs: {:?}", file_path, dirs);
164211
}
165212
}

0 commit comments

Comments
 (0)