Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 249 additions & 7 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
from typing import Dict, Any, List, Optional
from typing import Dict, Any, List, Optional, Union
from enum import Enum
from dataclasses import dataclass
import warnings

from fsspec import AbstractFileSystem
from packaging.version import Version
Expand All @@ -11,7 +14,7 @@
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.libs.utils import load_open_tables
from dlt.common.pipeline import SupportsPipeline
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, _TTableSchemaBase
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.utils import assert_min_pkg_version
from dlt.common.exceptions import MissingDependencyException
Expand All @@ -27,6 +30,10 @@
from pyiceberg.table import Table as IcebergTable
from pyiceberg.catalog import Catalog as IcebergCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec as IcebergPartitionSpec,
)
import pyarrow as pa
except ModuleNotFoundError:
raise MissingDependencyException(
Expand All @@ -51,6 +58,233 @@ def _patched_get_parquet_writer_kwargs(table_properties): # type: ignore[no-unt
_pio._get_parquet_writer_kwargs = _patched_get_parquet_writer_kwargs


class PartitionType(Enum):
"""Supported Iceberg partition transform types."""

IDENTITY = "identity"
BUCKET = "bucket"
TRUNCATE = "truncate"
DAY = "day"
MONTH = "month"
YEAR = "year"
HOUR = "hour"


@dataclass
class PartitionSpec:
"""Specification for an Iceberg partition."""

column: str
partition_type: str
index: int # to keep the order of partition in place
name: Optional[str] = None
bucket_count: Optional[int] = None


class IcebergPartitionManager:
"""Manages creation and application of Iceberg partition specifications."""

@staticmethod
def apply_partitioning(update_spec: Any, partition_specs: List[Dict[str, Any]]) -> None:
from pyiceberg.transforms import (
IdentityTransform,
BucketTransform,
TruncateTransform,
YearTransform,
MonthTransform,
DayTransform,
HourTransform,
)

transform_map = {
"identity": IdentityTransform,
"bucket": BucketTransform,
"truncate": TruncateTransform,
"year": YearTransform,
"month": MonthTransform,
"day": DayTransform,
"hour": HourTransform,
}

for spec_dict in partition_specs:
column = spec_dict["column"]
transform_type = spec_dict["type"]
partition_name = spec_dict.get("name")
bucket_count = spec_dict.get("bucket_count")

try:
if transform_type == "identity":
update_spec.add_identity(column)
elif transform_type == "bucket":
if not bucket_count:
raise ValueError(f"bucket_count required for bucket transform on {column}")
transform: Any = BucketTransform(bucket_count)
update_spec.add_field(column, transform, partition_name)
else:
if transform_type not in transform_map:
continue

transform_class = transform_map[transform_type]
transform = transform_class()
update_spec.add_field(column, transform, partition_name)

except Exception as e:
logger.warning(
f"Failed to apply {transform_type} partition to column '{column}': {str(e)}."
)
continue


def _validate_partition_spec(spec: Dict[str, Any]) -> bool:
"""Validate partition specification structure and requirements."""
column = spec.get("column", "unknown")
partition_type = spec.get("type", "identity")

# Validate bucket partition has bucket_count
if partition_type == "bucket" and not spec.get("bucket_count"):
logger.warning(
f"Bucket partition on column '{column}' missing required 'bucket_count'. Skipping."
)
return False

# Validate bucket_count is positive integer
if partition_type == "bucket":
bucket_count = spec.get("bucket_count")
if not isinstance(bucket_count, int) or bucket_count <= 0:
logger.warning(
f"Bucket partition on column '{column}' has invalid bucket_count: {bucket_count}. "
"Must be positive integer. Skipping."
)
return False

# Validate index is positive integer
index = spec.get("index")
if not isinstance(index, int) or index <= 0:
logger.warning(
f"Partition on column '{column}' has invalid index: {index}. "
"Must be positive integer. Skipping."
)
return False

# Validate supported partition types
supported_types = {"identity", "bucket", "truncate", "day", "month", "year", "hour"}
if partition_type not in supported_types:
logger.warning(
f"Partition on column '{column}' has unsupported type: '{partition_type}'. "
f"Supported types: {sorted(supported_types)}. Skipping."
)
return False

return True


def _validate_and_fix_indices(specs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Validate partition indices are unique and sequential, auto-fix if needed."""
if not specs:
return specs

# Check for duplicate indices
indices = [spec["index"] for spec in specs]
if len(set(indices)) == len(indices):
return specs # No duplicates, return as-is

# TODO: Sort by original index, then assign sequential indices
# sorted_specs = sorted(specs, key=lambda x: (x["index"], x["column"]))
# for i, spec in enumerate(sorted_specs, 1):
# original_index = spec["index"]
# spec["index"] = i
# if original_index != i:
# logger.info(
# f"Partition on column '{spec['column']}' index changed: {original_index} → {i}"
# )
# return sorted_specs
raise ValueError("Duplicate partition indices found. Please fix the schema.")


def extract_partition_specs_from_schema(
table_schema: Union[Dict[str, Any], _TTableSchemaBase], arrow_schema: pa.Schema
) -> List[Dict[str, Any]]:
"""Extract partition specifications from dlt table schema."""
"""
Supports both legacy and advanced partitioning with priority system:
- Legacy: partition: True -> identity partitioning
- Advanced: partition: {...} or partition: [...] -> custom transforms
- Priority: Advanced takes precedence over legacy (prevents conflicts)
Args:
table_schema: dlt table schema dictionary
arrow_schema: PyArrow schema for the table
Returns:
List of partition specification dictionaries, sorted by index
"""
columns = table_schema.get("columns", {})

# Performance optimization: Early exit if no partition hints found
if not any("partition" in col for col in columns.values()):
return []

advanced_partitions: List[Dict[str, Any]] = []
legacy_partitions: List[Dict[str, Any]] = []

for column_name, column_config in columns.items():
partition_hint = column_config.get("partition")

if partition_hint is None:
continue

if partition_hint is True:
# Legacy partitioning: simple identity
legacy_partitions.append(
{
"column": column_name,
"type": "identity",
"index": len(legacy_partitions) + 1, # Auto-assign index
"name": None,
"bucket_count": None,
}
)

elif isinstance(partition_hint, dict):
# Advanced single partition
spec = {
"column": column_name,
"type": partition_hint.get("type", "identity"),
"index": partition_hint.get("index", 1),
"name": partition_hint.get("name"),
"bucket_count": partition_hint.get("bucket_count"),
}
# Validate before adding
if _validate_partition_spec(spec):
advanced_partitions.append(spec)

elif isinstance(partition_hint, list):
# Advanced multiple partitions on same column
for spec_dict in partition_hint:
if isinstance(spec_dict, dict):
spec = {
"column": column_name,
"type": spec_dict.get("type", "identity"),
"index": spec_dict.get("index", 1),
"name": spec_dict.get("name"),
"bucket_count": spec_dict.get("bucket_count"),
}
# Validate before adding
if _validate_partition_spec(spec):
advanced_partitions.append(spec)

# Priority system: advanced partitions override legacy ones
if advanced_partitions:
# Validate and fix indices, then sort by corrected index
validated_specs = _validate_and_fix_indices(advanced_partitions)
return sorted(validated_specs, key=lambda x: x["index"])
elif legacy_partitions:
logger.info(f"Using legacy partitioning ({len(legacy_partitions)} partitions)")
return legacy_partitions
else:
return []


def ensure_iceberg_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP = {
pa.types.is_time32: pa.time64("us"),
Expand Down Expand Up @@ -174,14 +408,17 @@ def create_table(
catalog: IcebergCatalog,
table_id: str,
table_location: str,
schema: pa.Schema,
schema: Union[pa.Schema, "pyiceberg.schema.Schema"],
partition_columns: Optional[List[str]] = None,
partition_spec: Optional[IcebergPartitionSpec] = UNPARTITIONED_PARTITION_SPEC,
) -> None:
schema = ensure_iceberg_compatible_arrow_schema(schema)
if isinstance(schema, pa.Schema):
schema = ensure_iceberg_compatible_arrow_schema(schema)

if partition_columns:
# If the table is partitioned, create it in two steps:
# (1) start a create-table transaction, and (2) add the partition spec before committing
warnings.warn(
"partition_columns is deprecated. Use partition_spec instead.", DeprecationWarning
)
with catalog.create_table_transaction(
table_id,
schema=schema,
Expand All @@ -192,7 +429,12 @@ def create_table(
for col in partition_columns:
update_spec.add_identity(col)
else:
catalog.create_table(table_id, schema=schema, location=table_location)
catalog.create_table(
identifier=table_id,
schema=schema,
location=table_location,
partition_spec=partition_spec,
)


def get_iceberg_tables(
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class TColumnSchema(TColumnSchemaBase, total=False):
"""TypedDict that defines additional column hints"""

description: Optional[str]
partition: Optional[bool]
partition: Optional[Union[bool, Dict[str, Any], List[Dict[str, Any]]]]
cluster: Optional[bool]
unique: Optional[bool]
sort: Optional[bool]
Expand Down
Loading