Skip to content

feat: Introduce functionality to read and write typed parquets #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dataframely/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Concatenate, Generic, ParamSpec, TypeVar
from typing import TYPE_CHECKING, Any, Concatenate, Generic, Literal, ParamSpec, TypeVar

import polars as pl

Expand All @@ -15,6 +15,8 @@
P = ParamSpec("P")
R = TypeVar("R")

Validation = Literal["allow", "forbid", "warn", "skip"]


def inherit_signature( # pragma: no cover
target_fn: Callable[P, Any],
Expand Down
4 changes: 4 additions & 0 deletions dataframely/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ def __init__(self, attr: str, kls: type) -> None:
"`from __future__ import annotations` in the file that defines the collection."
)
super().__init__(message)


class ValidationRequiredError(Exception):
"""Error raised when validation is when reading a parquet file."""
215 changes: 212 additions & 3 deletions dataframely/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,32 @@
from __future__ import annotations

import json
import warnings
from abc import ABC
from collections.abc import Iterable, Mapping, Sequence
from typing import Any, Literal, Self, overload
from pathlib import Path
from typing import IO, Any, Literal, Self, overload

import polars as pl
import polars.exceptions as plexc
import polars.selectors as cs
from polars._typing import FileSource, PartitioningScheme

from ._base_schema import BaseSchema
from ._compat import pa, sa
from ._rule import Rule, rule_from_dict, with_evaluation_rules
from ._serialization import SchemaJSONDecoder, SchemaJSONEncoder
from ._typing import DataFrame, LazyFrame
from ._typing import DataFrame, LazyFrame, Validation
from ._validation import DtypeCasting, validate_columns, validate_dtypes
from .columns import Column, column_from_dict
from .config import Config
from .exc import RuleValidationError, ValidationError
from .exc import RuleValidationError, ValidationError, ValidationRequiredError
from .failure import FailureInfo
from .random import Generator

SERIALIZATION_FORMAT_VERSION = "1"
_ORIGINAL_NULL_SUFFIX = "__orig_null__"
_METADATA_KEY = "dataframely_schema"

# ------------------------------------------------------------------------------------ #
# SCHEMA DEFINITION #
Expand Down Expand Up @@ -671,6 +675,211 @@ def serialize(cls) -> str:
}
return json.dumps(result, cls=SchemaJSONEncoder)

# ------------------------------------ PARQUET ----------------------------------- #

@classmethod
def write_parquet(
cls, df: DataFrame[Self], /, file: str | Path | IO[bytes], **kwargs: Any
) -> None:
"""Write a typed data frame with this schema to a parquet file.

This method automatically adds a serialization of this schema to the parquet
file as metadata. This metadata can be leveraged by :meth:`read_parquet` and
:meth:`scan_parquet` for more efficient reading, or by external tools.

Args:
df: The data frame to write to the parquet file.
file: The file path or writable file-like object to which to write the
parquet file. This should be a path to a directory if writing a
partitioned dataset.
kwargs: Additional keyword arguments passed directly to
:meth:`polars.write_parquet`. ``metadata`` may only be provided if it
is a dictionary.

Attention:
Be aware that this method suffers from the same limitations as
:meth:`serialize`.
"""
metadata = kwargs.pop("metadata", {})
df.write_parquet(
file, metadata={**metadata, _METADATA_KEY: cls.serialize()}, **kwargs
)

@classmethod
def sink_parquet(
cls,
lf: LazyFrame[Self],
/,
file: str | Path | IO[bytes] | PartitioningScheme,
**kwargs: Any,
) -> None:
"""Stream a typed lazy frame with this schema to a parquet file.

This method automatically adds a serialization of this schema to the parquet
file as metadata. This metadata can be leveraged by :meth:`read_parquet` and
:meth:`scan_parquet` for more efficient reading, or by external tools.

Args:
lf: The lazy frame to write to the parquet file.
file: The file path, writable file-like object, or partitioning scheme to
which to write the parquet file.
kwargs: Additional keyword arguments passed directly to
:meth:`polars.write_parquet`. ``metadata`` may only be provided if it
is a dictionary.

Attention:
Be aware that this method suffers from the same limitations as
:meth:`serialize`.
"""
metadata = kwargs.pop("metadata", {})
lf.sink_parquet(
file, metadata={**metadata, _METADATA_KEY: cls.serialize()}, **kwargs
)

@classmethod
def read_parquet(
cls,
source: FileSource,
*,
validation: Validation = "warn",
**kwargs: Any,
) -> DataFrame[Self]:
"""Read a parquet file into a typed data frame with this schema.

Compared to :meth:`polars.read_parquet`, this method checks the parquet file's
metadata and runs validation if necessary to ensure that the data matches this
schema.

Args:
source: Path, directory, or file-like object from which to read the data.
validation: The strategy for running validation when reading the data:

- ``"allow"`: The method tries to read the parquet file's metadata. If
the stored schema matches this schema, the data frame is read without
validation. If the stored schema mismatches this schema or no schema
information can be found in the metadata, this method automatically
runs :meth:`validate` with ``cast=True``.
- ``"warn"`: The method behaves similarly to ``"allow"``. However,
it prints a warning if validation is necessary.
- ``"forbid"``: The method never runs validation automatically and only
returns if the schema stored in the parquet file's metadata matches
this schema.
- ``"skip"``: The method never runs validation and simply reads the
parquet file, entrusting the user that the schema is valid. _Use this
option carefully and consider replacing it with
:meth:`polars.read_parquet` to convey the purpose better_.

kwargs: Additional keyword arguments passed directly to
:meth:`polars.read_parquet`.

Returns:
The data frame with this schema.

Raises:
ValidationRequiredError: If no schema information can be read from the
source and ``validate`` is set to ``False``.

Attention:
Be aware that this method suffers from the same limitations as
:meth:`serialize`.
"""
if not cls._requires_validation_for_reading_parquet(source, validation):
return pl.read_parquet(source, **kwargs) # type: ignore
return cls.validate(pl.read_parquet(source, **kwargs), cast=True)

@classmethod
def scan_parquet(
cls,
source: FileSource,
*,
validation: Validation = "warn",
**kwargs: Any,
) -> LazyFrame[Self]:
"""Lazily read a parquet file into a typed data frame with this schema.

Compared to :meth:`polars.scan_parquet`, this method checks the parquet file's
metadata and runs validation if necessary to ensure that the data matches this
schema.

Args:
source: Path, directory, or file-like object from which to read the data.
validation: The strategy for running validation when reading the data:

- ``"allow"`: The method tries to read the parquet file's metadata. If
the stored schema matches this schema, the data frame is read without
validation. If the stored schema mismatches this schema or no schema
information can be found in the metadata, this method automatically
runs :meth:`validate` with ``cast=True``.
- ``"warn"`: The method behaves similarly to ``"allow"``. However,
it prints a warning if validation is necessary.
- ``"forbid"``: The method never runs validation automatically and only
returns if the schema stored in the parquet file's metadata matches
this schema.
- ``"skip"``: The method never runs validation and simply reads the
parquet file, entrusting the user that the schema is valid. _Use this
option carefully and consider replacing it with
:meth:`polars.read_parquet` to convey the purpose better_.

kwargs: Additional keyword arguments passed directly to
:meth:`polars.scan_parquet`.

Returns:
The data frame with this schema.

Raises:
ValidationRequiredError: If no schema information can be read from the
source and ``validate`` is set to ``False``.

Note:
Due to current limitations in dataframely, this method actually reads the
parquet file into memory if ``validate`` is ``"auto"`` or ``True`` and
validation is required.

Attention:
Be aware that this method suffers from the same limitations as
:meth:`serialize`.
"""
if not cls._requires_validation_for_reading_parquet(source, validation):
return pl.scan_parquet(source, **kwargs) # type: ignore
return cls.validate(pl.read_parquet(source, **kwargs), cast=True).lazy()

@classmethod
def _requires_validation_for_reading_parquet(
cls, source: FileSource, validation: Validation
) -> bool:
if validation == "skip":
return False

# First, we check whether the source provides the dataframely schema. If it
# does, we check whether it matches this schema. If it does, we assume that the
# data adheres to the schema and we do not need to run validation.
metadata = (
pl.read_parquet_metadata(source).get(_METADATA_KEY)
if not isinstance(source, list)
else None
)
if metadata is not None:
serialized_schema = deserialize_schema(metadata)
if cls.matches(serialized_schema):
return False

# Otherwise, we definitely need to run validation. However, we emit different
# information to the user depending on the value of `validate`.
msg = (
"current schema does not match stored schema"
if metadata is not None
else "no schema to check validity can be read from the source"
)
if validation == "forbid":
raise ValidationRequiredError(
f"Cannot read parquet file from '{source!r}' without validation: {msg}."
)
if validation == "warn":
warnings.warn(
f"Reading parquet file from '{source!r}' requires validation: {msg}."
)
return True

# ----------------------------- THIRD-PARTY PACKAGES ----------------------------- #

@classmethod
Expand Down
Loading
Loading