Skip to content

Commit

Permalink
refactor(python): Remove unused code paths in read_parquet (#15532)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Apr 8, 2024
1 parent 38bd8f2 commit cb5dd44
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 78 deletions.
138 changes: 62 additions & 76 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
from polars._utils.unstable import issue_unstable_warning
from polars._utils.various import (
is_int_sequence,
is_str_sequence,
normalize_filepath,
)
from polars._utils.wrap import wrap_df, wrap_ldf
from polars.convert import from_arrow
from polars.dependencies import _PYARROW_AVAILABLE
from polars.io._utils import (
is_glob_pattern,
is_local_file,
is_supported_cloud,
parse_columns_arg,
Expand Down Expand Up @@ -115,7 +113,7 @@ def read_parquet(
retries
Number of retries if accessing a cloud instance fails.
use_pyarrow
Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is
Use PyArrow instead of the Rust-native Parquet reader. The PyArrow reader is
more stable.
pyarrow_options
Keyword arguments for `pyarrow.parquet.read_table
Expand Down Expand Up @@ -147,11 +145,6 @@ def read_parquet(

# Dispatch to pyarrow if requested
if use_pyarrow:
if not _PYARROW_AVAILABLE:
msg = (
"'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`"
)
raise ModuleNotFoundError(msg)
if n_rows is not None:
msg = "`n_rows` cannot be used with `use_pyarrow=True`"
raise ValueError(msg)
Expand All @@ -161,40 +154,27 @@ def read_parquet(
"\n\nHint: Pass `pyarrow_options` instead with a 'partitioning' entry."
)
raise TypeError(msg)

import pyarrow as pa
import pyarrow.parquet

pyarrow_options = pyarrow_options or {}

with prepare_file_arg(
source, # type: ignore[arg-type]
use_pyarrow=True,
return _read_parquet_with_pyarrow(
source,
columns=columns,
storage_options=storage_options,
) as source_prep:
return from_arrow( # type: ignore[return-value]
pa.parquet.read_table(
source_prep,
memory_map=memory_map,
columns=columns,
**pyarrow_options,
)
)
pyarrow_options=pyarrow_options,
memory_map=memory_map,
)

# Read binary types using `read_parquet`
elif isinstance(source, (io.BufferedIOBase, io.RawIOBase, bytes)):
with prepare_file_arg(source, use_pyarrow=False) as source_prep:
return _read_parquet_binary(
source_prep,
columns=columns,
n_rows=n_rows,
parallel=parallel,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
low_memory=low_memory,
use_statistics=use_statistics,
rechunk=rechunk,
)
return _read_parquet_binary(
source,
columns=columns,
n_rows=n_rows,
parallel=parallel,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
low_memory=low_memory,
use_statistics=use_statistics,
rechunk=rechunk,
)

# For other inputs, defer to `scan_parquet`
lf = scan_parquet(
Expand All @@ -221,58 +201,64 @@ def read_parquet(
return lf.collect()


def _read_parquet_with_pyarrow(
source: str | Path | list[str] | list[Path] | IO[bytes] | bytes,
*,
columns: list[int] | list[str] | None = None,
storage_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
memory_map: bool = True,
) -> DataFrame:
if not _PYARROW_AVAILABLE:
msg = "'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`"
raise ModuleNotFoundError(msg)

import pyarrow as pa
import pyarrow.parquet

pyarrow_options = pyarrow_options or {}

with prepare_file_arg(
source, # type: ignore[arg-type]
use_pyarrow=True,
storage_options=storage_options,
) as source_prep:
pa_table = pa.parquet.read_table(
source_prep,
memory_map=memory_map,
columns=columns,
**pyarrow_options,
)
return from_arrow(pa_table) # type: ignore[return-value]


def _read_parquet_binary(
source: str | Path | IO[bytes] | bytes,
source: IO[bytes] | bytes,
*,
columns: Sequence[int] | Sequence[str] | None = None,
n_rows: int | None = None,
parallel: ParallelStrategy = "auto",
row_index_name: str | None = None,
row_index_offset: int = 0,
low_memory: bool = False,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
rechunk: bool = True,
low_memory: bool = False,
) -> DataFrame:
if isinstance(source, (str, Path)):
source = normalize_filepath(source)
if isinstance(columns, str):
columns = [columns]
projection, columns = parse_columns_arg(columns)
row_index = parse_row_index_args(row_index_name, row_index_offset)

if isinstance(source, str) and is_glob_pattern(source):
scan = scan_parquet(
source,
with prepare_file_arg(source) as source_prep:
pydf = PyDataFrame.read_parquet(
source_prep,
columns=columns,
projection=projection,
n_rows=n_rows,
rechunk=True,
row_index=row_index,
parallel=parallel,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
use_statistics=use_statistics,
rechunk=rechunk,
low_memory=low_memory,
)

if columns is None:
return scan.collect()
elif is_str_sequence(columns, allow_str=False):
return scan.select(columns).collect()
else:
msg = (
"cannot use glob patterns and integer based projection as `columns` argument"
"\n\nUse columns: List[str]"
)
raise TypeError(msg)

projection, columns = parse_columns_arg(columns)

pydf = PyDataFrame.read_parquet(
source,
columns,
projection,
n_rows,
parallel,
parse_row_index_args(row_index_name, row_index_offset),
low_memory=low_memory,
use_statistics=use_statistics,
rechunk=rechunk,
)
return wrap_df(pydf)


Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ impl PyDataFrame {

#[staticmethod]
#[cfg(feature = "parquet")]
#[pyo3(signature = (py_f, columns, projection, n_rows, parallel, row_index, low_memory, use_statistics, rechunk))]
#[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
pub fn read_parquet(
py: Python,
py_f: PyObject,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
n_rows: Option<usize>,
parallel: Wrap<ParallelStrategy>,
row_index: Option<(String, IdxSize)>,
low_memory: bool,
parallel: Wrap<ParallelStrategy>,
use_statistics: bool,
rechunk: bool,
) -> PyResult<Self> {
Expand Down

0 comments on commit cb5dd44

Please sign in to comment.