Skip to content

Commit 41a3c8e

Browse files
authored
Add metadata tables for data_files and delete_files (apache#1066)
* Add metadata tables for data_files and delete_files * Update API docs for `data_files` and `delete_files` * Update mehtod signature of `_files()` * Migrate implementation of files() table from __init__.py
1 parent de47590 commit 41a3c8e

File tree

3 files changed

+169
-131
lines changed

3 files changed

+169
-131
lines changed

mkdocs/docs/api.md

+5
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,11 @@ readable_metrics: [
845845
[6.0989]]
846846
```
847847

848+
!!! info
849+
Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes`
850+
851+
To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.
852+
848853
## Add Files
849854

850855
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/inspect.py

+19-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
from datetime import datetime
20-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
20+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
2323
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
@@ -473,7 +473,7 @@ def history(self) -> "pa.Table":
473473

474474
return pa.Table.from_pylist(history, schema=history_schema)
475475

476-
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
476+
def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
477477
import pyarrow as pa
478478

479479
from pyiceberg.io.pyarrow import schema_to_pyarrow
@@ -530,6 +530,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
530530
for manifest_list in snapshot.manifests(io):
531531
for manifest_entry in manifest_list.fetch_manifest_entry(io):
532532
data_file = manifest_entry.data_file
533+
if data_file_filter and data_file.content not in data_file_filter:
534+
continue
533535
column_sizes = data_file.column_sizes or {}
534536
value_counts = data_file.value_counts or {}
535537
null_value_counts = data_file.null_value_counts or {}
@@ -558,12 +560,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
558560
"spec_id": data_file.spec_id,
559561
"record_count": data_file.record_count,
560562
"file_size_in_bytes": data_file.file_size_in_bytes,
561-
"column_sizes": dict(data_file.column_sizes),
562-
"value_counts": dict(data_file.value_counts),
563-
"null_value_counts": dict(data_file.null_value_counts),
564-
"nan_value_counts": dict(data_file.nan_value_counts),
565-
"lower_bounds": dict(data_file.lower_bounds),
566-
"upper_bounds": dict(data_file.upper_bounds),
563+
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
564+
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
565+
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
566+
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
567+
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
568+
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
567569
"key_metadata": data_file.key_metadata,
568570
"split_offsets": data_file.split_offsets,
569571
"equality_ids": data_file.equality_ids,
@@ -575,3 +577,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
575577
files,
576578
schema=files_schema,
577579
)
580+
581+
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
582+
return self._files(snapshot_id)
583+
584+
def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
585+
return self._files(snapshot_id, {DataFileContent.DATA})
586+
587+
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
588+
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

tests/integration/test_inspect_table.py

+145-123
Original file line numberDiff line numberDiff line change
@@ -672,126 +672,141 @@ def test_inspect_files(
672672
# append more data
673673
tbl.append(arrow_table_with_null)
674674

675-
df = tbl.refresh().inspect.files()
675+
# configure table properties
676+
if format_version == 2:
677+
with tbl.transaction() as txn:
678+
txn.set_properties({"write.delete.mode": "merge-on-read"})
679+
spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
676680

677-
assert df.column_names == [
678-
"content",
679-
"file_path",
680-
"file_format",
681-
"spec_id",
682-
"record_count",
683-
"file_size_in_bytes",
684-
"column_sizes",
685-
"value_counts",
686-
"null_value_counts",
687-
"nan_value_counts",
688-
"lower_bounds",
689-
"upper_bounds",
690-
"key_metadata",
691-
"split_offsets",
692-
"equality_ids",
693-
"sort_order_id",
694-
"readable_metrics",
695-
]
696-
697-
# make sure the non-nullable fields are filled
698-
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
699-
for value in df[int_column]:
700-
assert isinstance(value.as_py(), int)
701-
702-
for split_offsets in df["split_offsets"]:
703-
assert isinstance(split_offsets.as_py(), list)
704-
705-
for file_format in df["file_format"]:
706-
assert file_format.as_py() == "PARQUET"
681+
files_df = tbl.refresh().inspect.files()
707682

708-
for file_path in df["file_path"]:
709-
assert file_path.as_py().startswith("s3://")
683+
data_files_df = tbl.inspect.data_files()
710684

711-
lhs = df.to_pandas()
712-
rhs = spark.table(f"{identifier}.files").toPandas()
685+
delete_files_df = tbl.inspect.delete_files()
713686

714-
lhs_subset = lhs[
715-
[
687+
def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
688+
assert df.column_names == [
716689
"content",
717690
"file_path",
718691
"file_format",
719692
"spec_id",
720693
"record_count",
721694
"file_size_in_bytes",
695+
"column_sizes",
696+
"value_counts",
697+
"null_value_counts",
698+
"nan_value_counts",
699+
"lower_bounds",
700+
"upper_bounds",
701+
"key_metadata",
722702
"split_offsets",
723703
"equality_ids",
724704
"sort_order_id",
705+
"readable_metrics",
725706
]
726-
]
727-
rhs_subset = rhs[
728-
[
729-
"content",
730-
"file_path",
731-
"file_format",
732-
"spec_id",
733-
"record_count",
734-
"file_size_in_bytes",
735-
"split_offsets",
736-
"equality_ids",
737-
"sort_order_id",
707+
708+
# make sure the non-nullable fields are filled
709+
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
710+
for value in df[int_column]:
711+
assert isinstance(value.as_py(), int)
712+
713+
for split_offsets in df["split_offsets"]:
714+
assert isinstance(split_offsets.as_py(), list)
715+
716+
for file_format in df["file_format"]:
717+
assert file_format.as_py() == "PARQUET"
718+
719+
for file_path in df["file_path"]:
720+
assert file_path.as_py().startswith("s3://")
721+
722+
lhs = df.to_pandas()
723+
rhs = spark_df.toPandas()
724+
725+
lhs_subset = lhs[
726+
[
727+
"content",
728+
"file_path",
729+
"file_format",
730+
"spec_id",
731+
"record_count",
732+
"file_size_in_bytes",
733+
"split_offsets",
734+
"equality_ids",
735+
"sort_order_id",
736+
]
737+
]
738+
rhs_subset = rhs[
739+
[
740+
"content",
741+
"file_path",
742+
"file_format",
743+
"spec_id",
744+
"record_count",
745+
"file_size_in_bytes",
746+
"split_offsets",
747+
"equality_ids",
748+
"sort_order_id",
749+
]
738750
]
739-
]
740751

741-
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
752+
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
742753

743-
for column in df.column_names:
744-
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
745-
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
746-
# NaN != NaN in Python
747-
continue
748-
if column in [
749-
"column_sizes",
750-
"value_counts",
751-
"null_value_counts",
752-
"nan_value_counts",
753-
"lower_bounds",
754-
"upper_bounds",
755-
]:
756-
if isinstance(right, dict):
757-
left = dict(left)
758-
assert left == right, f"Difference in column {column}: {left} != {right}"
754+
for column in df.column_names:
755+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
756+
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
757+
# NaN != NaN in Python
758+
continue
759+
if column in [
760+
"column_sizes",
761+
"value_counts",
762+
"null_value_counts",
763+
"nan_value_counts",
764+
"lower_bounds",
765+
"upper_bounds",
766+
]:
767+
if isinstance(right, dict):
768+
left = dict(left)
769+
assert left == right, f"Difference in column {column}: {left} != {right}"
759770

760-
elif column == "readable_metrics":
761-
assert list(left.keys()) == [
762-
"bool",
763-
"string",
764-
"string_long",
765-
"int",
766-
"long",
767-
"float",
768-
"double",
769-
"timestamp",
770-
"timestamptz",
771-
"date",
772-
"binary",
773-
"fixed",
774-
]
775-
assert left.keys() == right.keys()
776-
777-
for rm_column in left.keys():
778-
rm_lhs = left[rm_column]
779-
rm_rhs = right[rm_column]
780-
781-
assert rm_lhs["column_size"] == rm_rhs["column_size"]
782-
assert rm_lhs["value_count"] == rm_rhs["value_count"]
783-
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
784-
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
785-
786-
if rm_column == "timestamptz":
787-
# PySpark does not correctly set the timstamptz
788-
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
789-
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
790-
791-
assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
792-
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
793-
else:
794-
assert left == right, f"Difference in column {column}: {left} != {right}"
771+
elif column == "readable_metrics":
772+
assert list(left.keys()) == [
773+
"bool",
774+
"string",
775+
"string_long",
776+
"int",
777+
"long",
778+
"float",
779+
"double",
780+
"timestamp",
781+
"timestamptz",
782+
"date",
783+
"binary",
784+
"fixed",
785+
]
786+
assert left.keys() == right.keys()
787+
788+
for rm_column in left.keys():
789+
rm_lhs = left[rm_column]
790+
rm_rhs = right[rm_column]
791+
792+
assert rm_lhs["column_size"] == rm_rhs["column_size"]
793+
assert rm_lhs["value_count"] == rm_rhs["value_count"]
794+
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
795+
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
796+
797+
if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
798+
# PySpark does not correctly set the timstamptz
799+
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
800+
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
801+
802+
assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
803+
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
804+
else:
805+
assert left == right, f"Difference in column {column}: {left} != {right}"
806+
807+
inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
808+
inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files"))
809+
inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files"))
795810

796811

797812
@pytest.mark.integration
@@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog
801816

802817
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
803818

804-
df = tbl.refresh().inspect.files()
819+
files_df = tbl.refresh().inspect.files()
820+
data_files_df = tbl.inspect.data_files()
821+
delete_files_df = tbl.inspect.delete_files()
805822

806-
assert df.column_names == [
807-
"content",
808-
"file_path",
809-
"file_format",
810-
"spec_id",
811-
"record_count",
812-
"file_size_in_bytes",
813-
"column_sizes",
814-
"value_counts",
815-
"null_value_counts",
816-
"nan_value_counts",
817-
"lower_bounds",
818-
"upper_bounds",
819-
"key_metadata",
820-
"split_offsets",
821-
"equality_ids",
822-
"sort_order_id",
823-
"readable_metrics",
824-
]
823+
def inspect_files_asserts(df: pa.Table) -> None:
824+
assert df.column_names == [
825+
"content",
826+
"file_path",
827+
"file_format",
828+
"spec_id",
829+
"record_count",
830+
"file_size_in_bytes",
831+
"column_sizes",
832+
"value_counts",
833+
"null_value_counts",
834+
"nan_value_counts",
835+
"lower_bounds",
836+
"upper_bounds",
837+
"key_metadata",
838+
"split_offsets",
839+
"equality_ids",
840+
"sort_order_id",
841+
"readable_metrics",
842+
]
843+
844+
assert df.to_pandas().empty is True
825845

826-
assert df.to_pandas().empty is True
846+
inspect_files_asserts(files_df)
847+
inspect_files_asserts(data_files_df)
848+
inspect_files_asserts(delete_files_df)

0 commit comments

Comments
 (0)