Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jun 18, 2024
1 parent 1f15e1c commit 0d4526b
Show file tree
Hide file tree
Showing 34 changed files with 419 additions and 362 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/polars-io/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub struct RowIndex {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HiveOptions {
pub enabled: bool,
pub hive_start_idx: usize,
pub schema: Option<SchemaRef>,
}

impl Default for HiveOptions {
fn default() -> Self {
Self {
enabled: true,
hive_start_idx: 0,
schema: None,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ polars-utils = { workspace = true }
ahash = { workspace = true }
bitflags = { workspace = true }
glob = { version = "0.3" }
memchr = { workspace = true }
once_cell = { workspace = true }
pyo3 = { workspace = true, optional = true }
rayon = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl AnonymousScan for LazyJsonLineReader {
}
fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {
let schema = scan_opts.output_schema.unwrap_or(scan_opts.schema);
JsonLineReader::from_path(&self.path)?
JsonLineReader::from_path(self.paths.first().unwrap())?
.with_schema(schema)
.with_rechunk(self.rechunk)
.with_chunk_size(self.batch_size)
Expand All @@ -26,7 +26,7 @@ impl AnonymousScan for LazyJsonLineReader {
return Ok(schema.clone());
}

let f = polars_utils::open_file(&self.path)?;
let f = polars_utils::open_file(self.paths.first().unwrap())?;
let mut reader = std::io::BufReader::new(f);

let schema = Arc::new(polars_io::ndjson::infer_schema(
Expand Down
68 changes: 31 additions & 37 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use hive::HivePartitions;
use polars_core::config;
#[cfg(feature = "cloud")]
use polars_core::config::{get_file_prefetch_size, verbose};
Expand All @@ -15,6 +16,7 @@ use super::*;
pub struct ParquetExec {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
#[allow(dead_code)]
Expand All @@ -25,9 +27,11 @@ pub struct ParquetExec {
}

impl ParquetExec {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
paths: Arc<[PathBuf]>,
file_info: FileInfo,
hive_parts: Option<Vec<Arc<HivePartitions>>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
Expand All @@ -37,6 +41,7 @@ impl ParquetExec {
ParquetExec {
paths,
file_info,
hive_parts,
predicate,
options,
cloud_options,
Expand All @@ -57,29 +62,25 @@ impl ParquetExec {

let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut base_row_index = self.file_options.row_index.take();

// Limit no. of files at a time to prevent open file limits.
for paths in self
.paths
.chunks(std::cmp::min(POOL.current_num_threads(), 128))
{
let step = std::cmp::min(POOL.current_num_threads(), 128);

for i in (0..self.paths.len()).step_by(step) {
let end = std::cmp::min(i.saturating_add(step), self.paths.len());
let paths = &self.paths[i..end];
let hive_parts = self.hive_parts.as_ref().map(|x| &x[i..end]);

if remaining_rows_to_read == 0 && !result.is_empty() {
return Ok(result);
}

// First initialize the readers, predicates and metadata.
// This will be used to determine the slices. That way we can actually read all the
// files in parallel even if we add row index columns or slices.
let readers_and_metadata = paths
.iter()
.map(|path| {
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path)?;

let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
let readers_and_metadata = (0..paths.len())
.map(|i| {
let path = &paths[i];
let hive_partitions = hive_parts.map(|x| x[i].materialize_partition_columns());

let file = std::fs::File::open(path)?;
let (projection, predicate) = prepare_scan_args(
Expand Down Expand Up @@ -251,16 +252,16 @@ impl ParquetExec {
eprintln!("reading of {}/{} file...", processed, self.paths.len());
}

let iter = readers_and_metadata
.into_iter()
.zip(rows_statistics.iter())
.zip(paths.as_ref().iter())
.map(
|(
((num_rows_this_file, reader), (remaining_rows_to_read, cumulative_read)),
path,
)| async move {
let mut file_info = file_info.clone();
let iter = readers_and_metadata.into_iter().enumerate().map(
|(i, (num_rows_this_file, reader))| {
let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i];
let hive_partitions = self
.hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

async move {
let file_info = file_info.clone();
let remaining_rows_to_read = *remaining_rows_to_read;
let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read
{
Expand All @@ -273,13 +274,6 @@ impl ParquetExec {
offset: rc.offset + *cumulative_read as IdxSize,
});

file_info.update_hive_partitions(path)?;

let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (projection, predicate) = prepare_scan_args(
predicate.clone(),
&mut file_options.with_columns.clone(),
Expand All @@ -299,8 +293,9 @@ impl ParquetExec {
.finish()
.await
.map(Some)
},
);
}
},
);

let dfs = futures::future::try_join_all(iter).await?;
let n_read = dfs
Expand Down Expand Up @@ -333,10 +328,9 @@ impl ParquetExec {
Some(p) => is_cloud_url(p.as_path()),
None => {
let hive_partitions = self
.file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
.and_then(|ref x| x.first())
.map(|x| x.materialize_partition_columns());
let (projection, _) = prepare_scan_args(
None,
&mut self.file_options.with_columns,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ fn create_physical_plan_impl(
Scan {
paths,
file_info,
hive_parts,
output_schema,
scan_type,
predicate,
Expand Down Expand Up @@ -283,6 +284,7 @@ fn create_physical_plan_impl(
} => Ok(Box::new(executors::ParquetExec::new(
paths,
file_info,
hive_parts,
predicate,
options,
cloud_options,
Expand Down
55 changes: 15 additions & 40 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::prelude::*;
#[derive(Clone)]
#[cfg(feature = "csv")]
pub struct LazyCsvReader {
path: PathBuf,
paths: Arc<[PathBuf]>,
glob: bool,
cache: bool,
Expand All @@ -35,8 +34,7 @@ impl LazyCsvReader {

pub fn new(path: impl AsRef<Path>) -> Self {
LazyCsvReader {
path: path.as_ref().to_owned(),
paths: Arc::new([]),
paths: Arc::new([path.as_ref().to_path_buf()]),
glob: true,
cache: true,
read_options: Default::default(),
Expand Down Expand Up @@ -218,15 +216,13 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let mut file = if let Some(mut paths) = self.iter_paths()? {
let path = match paths.next() {
Some(globresult) => globresult?,
None => polars_bail!(ComputeError: "globbing pattern did not match any files"),
};
polars_utils::open_file(path)
} else {
polars_utils::open_file(&self.path)
}?;
let paths = self.expand_paths()?.0;
let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};

let mut file = polars_utils::open_file(path)?;

let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let skip_rows = self.read_options.skip_rows;
let parse_options = self.read_options.get_parse_options();
Expand Down Expand Up @@ -264,25 +260,9 @@ impl LazyCsvReader {

impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(mut self) -> PolarsResult<LazyFrame> {
if !self.glob {
return self.finish_no_glob();
}
if let Some(paths) = self.iter_paths()? {
let paths = paths
.into_iter()
.collect::<PolarsResult<Arc<[PathBuf]>>>()?;
self.paths = paths;
}
self.finish_no_glob()
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let paths = if self.paths.is_empty() {
Arc::new([self.path])
} else {
self.paths
};
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths()?.0;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
Expand All @@ -292,23 +272,18 @@ impl LazyFileListReader for LazyCsvReader {
Ok(lf)
}

fn glob(&self) -> bool {
self.glob
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
unreachable!();
}

fn path(&self) -> &Path {
&self.path
fn glob(&self) -> bool {
self.glob
}

fn paths(&self) -> &[PathBuf] {
&self.paths
}

fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}

fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self {
self.paths = paths;
self
Expand Down
Loading

0 comments on commit 0d4526b

Please sign in to comment.