Skip to content

Commit

Permalink
refactor(rust): Move path expansion utils to polars-io crate (pola-…
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Jul 9, 2024
1 parent 7b8612c commit 383c48a
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 397 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ chrono-tz = { workspace = true, optional = true }
fast-float = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
glob = { version = "0.3" }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod other;
mod path;

pub use other::*;
pub use path::*;
143 changes: 22 additions & 121 deletions crates/polars-io/src/utils.rs → crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use std::borrow::Cow;
use std::io::Read;
use std::path::{Path, PathBuf};

use once_cell::sync::Lazy;
use polars_core::prelude::*;
Expand All @@ -11,39 +10,6 @@ use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};

pub static POLARS_TEMP_DIR_BASE_PATH: Lazy<Box<Path>> = Lazy::new(|| {
let path = std::env::var("POLARS_TEMP_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| {
PathBuf::from(std::env::temp_dir().to_string_lossy().as_ref()).join("polars/")
})
.into_boxed_path();

if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
if !path.is_dir() {
panic!(
"failed to create temporary directory: path = {}, err = {}",
path.to_str().unwrap(),
err
);
}
}

path
});

/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
#[cfg(feature = "file_cache")]
pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
let result = std::fs::create_dir_all(path);

if path.is_dir() {
Ok(())
} else {
result
}
}

pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
reader: &'a mut R,
) -> PolarsResult<ReaderBytes<'a>> {
Expand All @@ -70,18 +36,28 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
}
}

// used by python polars
pub fn resolve_homedir(path: &Path) -> PathBuf {
// replace "~" with home directory
if path.starts_with("~") {
// home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
#[cfg(not(target_family = "wasm"))]
if let Some(homedir) = home::home_dir() {
return homedir.join(path.strip_prefix("~").unwrap());
}
}
/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read
/// concurrently/parallel
///
/// This takes an iterator over the number of rows per file.
pub fn get_sequential_row_statistics<I>(
iter: I,
mut total_rows_to_read: usize,
) -> Vec<(usize, usize)>
where
I: Iterator<Item = usize>,
{
let mut cumulative_read = 0;
iter.map(|rows_this_file| {
let remaining_rows_to_read = total_rows_to_read;
total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file);

let current_cumulative_read = cumulative_read;
cumulative_read += rows_this_file;

path.into()
(remaining_rows_to_read, current_cumulative_read)
})
.collect()
}

#[cfg(any(
Expand Down Expand Up @@ -186,30 +162,6 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off
}
}

/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read
/// concurrently/parallel
///
/// This takes an iterator over the number of rows per file.
pub fn get_sequential_row_statistics<I>(
iter: I,
mut total_rows_to_read: usize,
) -> Vec<(usize, usize)>
where
I: Iterator<Item = usize>,
{
let mut cumulative_read = 0;
iter.map(|rows_this_file| {
let remaining_rows_to_read = total_rows_to_read;
total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file);

let current_cumulative_read = cumulative_read;
cumulative_read += rows_this_file;

(remaining_rows_to_read, current_cumulative_read)
})
.collect()
}

#[cfg(feature = "json")]
pub(crate) fn overwrite_schema(
schema: &mut Schema,
Expand Down Expand Up @@ -338,22 +290,9 @@ pub(crate) fn chunk_df_for_writing(
Ok(result)
}

static CLOUD_URL: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?)://").unwrap());

/// Check if the path is a cloud url.
pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
match p.as_ref().as_os_str().to_str() {
Some(s) => CLOUD_URL.is_match(s),
_ => false,
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use super::{resolve_homedir, FLOAT_RE};
use super::FLOAT_RE;

#[test]
fn test_float_parse() {
Expand All @@ -375,42 +314,4 @@ mod tests {
assert!(FLOAT_RE.is_match("7e-05"));
assert!(FLOAT_RE.is_match("+7e+05"));
}

#[cfg(not(target_os = "windows"))]
#[test]
fn test_resolve_homedir() {
let paths: Vec<PathBuf> = vec![
"~/dir1/dir2/test.csv".into(),
"/abs/path/test.csv".into(),
"rel/path/test.csv".into(),
"/".into(),
"~".into(),
];

let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();

assert_eq!(resolved[0].file_name(), paths[0].file_name());
assert!(resolved[0].is_absolute());
assert_eq!(resolved[1], paths[1]);
assert_eq!(resolved[2], paths[2]);
assert_eq!(resolved[3], paths[3]);
assert!(resolved[4].is_absolute());
}

#[cfg(target_os = "windows")]
#[test]
fn test_resolve_homedir_windows() {
let paths: Vec<PathBuf> = vec![
r#"c:\Users\user1\test.csv"#.into(),
r#"~\user1\test.csv"#.into(),
"~".into(),
];

let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();

assert_eq!(resolved[0], paths[0]);
assert_eq!(resolved[1].file_name(), paths[1].file_name());
assert!(resolved[1].is_absolute());
assert!(resolved[2].is_absolute());
}
}
Loading

0 comments on commit 383c48a

Please sign in to comment.