Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Oct 16, 2024
1 parent 007b7e5 commit e3820bb
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
34 changes: 20 additions & 14 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,19 @@ impl<R: MmapBytesReader> ParquetReader<R> {
pub fn with_arrow_schema_projection(
mut self,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&Arc<ArrowSchema>>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
// `self.schema` gets overwritten if allow_missing_columns
let this_schema_width = self.schema()?.len();

if allow_missing_columns {
// Must check the dtypes
let schema = projected_arrow_schema
.cloned()
.unwrap_or_else(|| first_schema.clone());
ensure_matching_dtypes_if_found(&schema, self.schema()?.as_ref())?;
self.schema.replace(schema);
ensure_matching_dtypes_if_found(
projected_arrow_schema.unwrap_or(first_schema.as_ref()),
self.schema()?.as_ref(),
)?;
self.schema.replace(first_schema.clone());
}

let schema = self.schema()?;
Expand All @@ -107,7 +110,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
if this_schema_width > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
Expand Down Expand Up @@ -328,16 +331,19 @@ impl ParquetAsyncReader {
pub async fn with_arrow_schema_projection(
mut self,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&Arc<ArrowSchema>>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
// `self.schema` gets overwritten if allow_missing_columns
let this_schema_width = self.schema().await?.len();

if allow_missing_columns {
// Must check the dtypes
let schema = projected_arrow_schema
.cloned()
.unwrap_or_else(|| first_schema.clone());
ensure_matching_dtypes_if_found(&schema, self.schema().await?.as_ref())?;
self.schema.replace(schema);
ensure_matching_dtypes_if_found(
projected_arrow_schema.unwrap_or(first_schema.as_ref()),
self.schema().await?.as_ref(),
)?;
self.schema.replace(first_schema.clone());
}

let schema = self.schema().await?;
Expand All @@ -349,7 +355,7 @@ impl ParquetAsyncReader {
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
if this_schema_width > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl ParquetExec {
.with_predicate(predicate.clone())
.with_arrow_schema_projection(
&first_schema,
projected_arrow_schema.as_ref(),
projected_arrow_schema.as_deref(),
allow_missing_columns,
)?
.finish()?;
Expand Down Expand Up @@ -427,7 +427,7 @@ impl ParquetExec {
.with_row_index(row_index)
.with_arrow_schema_projection(
&first_schema,
projected_arrow_schema.as_ref(),
projected_arrow_schema.as_deref(),
allow_missing_columns,
)
.await?
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ParquetSource {
let mut reader = reader
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_ref(),
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)?
.with_row_index(file_options.row_index)
Expand Down Expand Up @@ -199,7 +199,7 @@ impl ParquetSource {
.with_row_index(file_options.row_index)
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_ref(),
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)
.await?
Expand Down
14 changes: 11 additions & 3 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,18 @@ def test_parquet_schema_arg(

schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]

lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)
for allow_missing_columns in [True, False]:
lf = pl.scan_parquet(
paths,
parallel=parallel,
schema=schema,
allow_missing_columns=allow_missing_columns,
)

with pytest.raises(pl.exceptions.SchemaError, match="file contained extra columns"):
lf.collect(streaming=streaming)
with pytest.raises(
pl.exceptions.SchemaError, match="file contained extra columns"
):
lf.collect(streaming=streaming)

lf = pl.scan_parquet(paths, parallel=parallel, schema=schema).select("a")

Expand Down

0 comments on commit e3820bb

Please sign in to comment.