Skip to content

Commit 0a3a886

Browse files
ndrluisFokko
andauthored
Add table statistics (#1285)
* Add table statistics update * Update pyiceberg/table/statistics.py Co-authored-by: Fokko Driesprong <fokko@apache.org> * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong <fokko@apache.org> * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong <fokko@apache.org> * Add Literal import * Rewrite tests --------- Co-authored-by: Fokko Driesprong <fokko@apache.org>
1 parent f4caa3a commit 0a3a886

File tree

10 files changed

+488
-2
lines changed

10 files changed

+488
-2
lines changed

mkdocs/docs/api.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,29 @@ with table.manage_snapshots() as ms:
12581258
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
12591259
```
12601260

1261+
## Table Statistics Management
1262+
1263+
Manage table statistics with operations through the `Table` API:
1264+
1265+
```python
1266+
# To run a specific operation
1267+
table.update_statistics().set_statistics(snapshot_id=1, statistics_file=statistics_file).commit()
1268+
# To run multiple operations
1269+
table.update_statistics()
1270+
.set_statistics(snapshot_id1, statistics_file1)
1271+
.remove_statistics(snapshot_id2)
1272+
.commit()
1273+
# Operations are applied on commit.
1274+
```
1275+
1276+
You can also use context managers to make more changes:
1277+
1278+
```python
1279+
with table.update_statistics() as update:
1280+
update.set_statistics(snaphsot_id1, statistics_file)
1281+
update.remove_statistics(snapshot_id2)
1282+
```
1283+
12611284
## Query the data
12621285

12631286
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

pyiceberg/table/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
_FastAppendFiles,
119119
)
120120
from pyiceberg.table.update.spec import UpdateSpec
121+
from pyiceberg.table.update.statistics import UpdateStatistics
121122
from pyiceberg.transforms import IdentityTransform
122123
from pyiceberg.typedef import (
123124
EMPTY_DICT,
@@ -1043,6 +1044,23 @@ def manage_snapshots(self) -> ManageSnapshots:
10431044
"""
10441045
return ManageSnapshots(transaction=Transaction(self, autocommit=True))
10451046

1047+
def update_statistics(self) -> UpdateStatistics:
1048+
"""
1049+
Shorthand to run statistics management operations like add statistics and remove statistics.
1050+
1051+
Use table.update_statistics().<operation>().commit() to run a specific operation.
1052+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
1053+
1054+
Pending changes are applied on commit.
1055+
1056+
We can also use context managers to make more changes. For example:
1057+
1058+
with table.update_statistics() as update:
1059+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
1060+
update.remove_statistics(snapshot_id=2)
1061+
"""
1062+
return UpdateStatistics(transaction=Transaction(self, autocommit=True))
1063+
10461064
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
10471065
"""Create a new UpdateSchema to alter the columns of this table.
10481066

pyiceberg/table/metadata.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
SortOrder,
4545
assign_fresh_sort_order_ids,
4646
)
47+
from pyiceberg.table.statistics import StatisticsFile
4748
from pyiceberg.typedef import (
4849
EMPTY_DICT,
4950
IcebergBaseModel,
@@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
221222
There is always a main branch reference pointing to the
222223
current-snapshot-id even if the refs map is null."""
223224

225+
statistics: List[StatisticsFile] = Field(default_factory=list)
226+
"""A optional list of table statistics files.
227+
Table statistics files are valid Puffin files. Statistics are
228+
informational. A reader can choose to ignore statistics
229+
information. Statistics support is not required to read the
230+
table correctly. A table can contain many statistics files
231+
associated with different table snapshots."""
232+
224233
# validators
225234
@field_validator("properties", mode="before")
226235
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:

pyiceberg/table/statistics.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import Dict, List, Literal, Optional
18+
19+
from pydantic import Field
20+
21+
from pyiceberg.typedef import IcebergBaseModel
22+
23+
24+
class BlobMetadata(IcebergBaseModel):
25+
type: Literal["apache-datasketches-theta-v1", "deletion-vector-v1"]
26+
snapshot_id: int = Field(alias="snapshot-id")
27+
sequence_number: int = Field(alias="sequence-number")
28+
fields: List[int]
29+
properties: Optional[Dict[str, str]] = None
30+
31+
32+
class StatisticsFile(IcebergBaseModel):
33+
snapshot_id: int = Field(alias="snapshot-id")
34+
statistics_path: str = Field(alias="statistics-path")
35+
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
36+
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
37+
key_metadata: Optional[str] = Field(alias="key-metadata", default=None)
38+
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")
39+
40+
41+
def filter_statistics_by_snapshot_id(
42+
statistics: List[StatisticsFile],
43+
reject_snapshot_id: int,
44+
) -> List[StatisticsFile]:
45+
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39+
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
3940
from pyiceberg.typedef import (
4041
IcebergBaseModel,
4142
Properties,
@@ -174,6 +175,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
174175
removals: List[str]
175176

176177

178+
class SetStatisticsUpdate(IcebergBaseModel):
179+
action: Literal["set-statistics"] = Field(default="set-statistics")
180+
snapshot_id: int = Field(alias="snapshot-id")
181+
statistics: StatisticsFile
182+
183+
184+
class RemoveStatisticsUpdate(IcebergBaseModel):
185+
action: Literal["remove-statistics"] = Field(default="remove-statistics")
186+
snapshot_id: int = Field(alias="snapshot-id")
187+
188+
177189
TableUpdate = Annotated[
178190
Union[
179191
AssignUUIDUpdate,
@@ -191,6 +203,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
191203
SetLocationUpdate,
192204
SetPropertiesUpdate,
193205
RemovePropertiesUpdate,
206+
SetStatisticsUpdate,
207+
RemoveStatisticsUpdate,
194208
],
195209
Field(discriminator="action"),
196210
]
@@ -475,6 +489,28 @@ def _(
475489
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})
476490

477491

492+
@_apply_table_update.register(SetStatisticsUpdate)
493+
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
494+
if update.snapshot_id != update.statistics.snapshot_id:
495+
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")
496+
497+
statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
498+
context.add_update(update)
499+
500+
return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]})
501+
502+
503+
@_apply_table_update.register(RemoveStatisticsUpdate)
504+
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
505+
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
506+
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")
507+
508+
statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
509+
context.add_update(update)
510+
511+
return base_metadata.model_copy(update={"statistics": statistics})
512+
513+
478514
def update_table_metadata(
479515
base_metadata: TableMetadata,
480516
updates: Tuple[TableUpdate, ...],
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import TYPE_CHECKING, Tuple
18+
19+
from pyiceberg.table.statistics import StatisticsFile
20+
from pyiceberg.table.update import (
21+
RemoveStatisticsUpdate,
22+
SetStatisticsUpdate,
23+
TableUpdate,
24+
UpdatesAndRequirements,
25+
UpdateTableMetadata,
26+
)
27+
28+
if TYPE_CHECKING:
29+
from pyiceberg.table import Transaction
30+
31+
32+
class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]):
33+
"""
34+
Run statistics management operations using APIs.
35+
36+
APIs include set_statistics and remove statistics operations.
37+
38+
Use table.update_statistics().<operation>().commit() to run a specific operation.
39+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
40+
41+
Pending changes are applied on commit.
42+
43+
We can also use context managers to make more changes. For example:
44+
45+
with table.update_statistics() as update:
46+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
47+
update.remove_statistics(snapshot_id=2)
48+
"""
49+
50+
_updates: Tuple[TableUpdate, ...] = ()
51+
52+
def __init__(self, transaction: "Transaction") -> None:
53+
super().__init__(transaction)
54+
55+
def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics":
56+
self._updates += (
57+
SetStatisticsUpdate(
58+
snapshot_id=snapshot_id,
59+
statistics=statistics_file,
60+
),
61+
)
62+
63+
return self
64+
65+
def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics":
66+
self._updates = (
67+
RemoveStatisticsUpdate(
68+
snapshot_id=snapshot_id,
69+
),
70+
)
71+
72+
return self
73+
74+
def _commit(self) -> UpdatesAndRequirements:
75+
return self._updates, ()

tests/conftest.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,87 @@ def generate_snapshot(
955955
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
956956
}
957957

958+
TABLE_METADATA_V2_WITH_STATISTICS = {
959+
"format-version": 2,
960+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
961+
"location": "s3://bucket/test/location",
962+
"last-sequence-number": 34,
963+
"last-updated-ms": 1602638573590,
964+
"last-column-id": 3,
965+
"current-schema-id": 0,
966+
"schemas": [
967+
{
968+
"type": "struct",
969+
"schema-id": 0,
970+
"fields": [
971+
{
972+
"id": 1,
973+
"name": "x",
974+
"required": True,
975+
"type": "long",
976+
}
977+
],
978+
}
979+
],
980+
"default-spec-id": 0,
981+
"partition-specs": [{"spec-id": 0, "fields": []}],
982+
"last-partition-id": 1000,
983+
"default-sort-order-id": 0,
984+
"sort-orders": [{"order-id": 0, "fields": []}],
985+
"properties": {},
986+
"current-snapshot-id": 3055729675574597004,
987+
"snapshots": [
988+
{
989+
"snapshot-id": 3051729675574597004,
990+
"timestamp-ms": 1515100955770,
991+
"sequence-number": 0,
992+
"summary": {"operation": "append"},
993+
"manifest-list": "s3://a/b/1.avro",
994+
},
995+
{
996+
"snapshot-id": 3055729675574597004,
997+
"parent-snapshot-id": 3051729675574597004,
998+
"timestamp-ms": 1555100955770,
999+
"sequence-number": 1,
1000+
"summary": {"operation": "append"},
1001+
"manifest-list": "s3://a/b/2.avro",
1002+
"schema-id": 1,
1003+
},
1004+
],
1005+
"statistics": [
1006+
{
1007+
"snapshot-id": 3051729675574597004,
1008+
"statistics-path": "s3://a/b/stats.puffin",
1009+
"file-size-in-bytes": 413,
1010+
"file-footer-size-in-bytes": 42,
1011+
"blob-metadata": [
1012+
{
1013+
"type": "apache-datasketches-theta-v1",
1014+
"snapshot-id": 3051729675574597004,
1015+
"sequence-number": 1,
1016+
"fields": [1],
1017+
}
1018+
],
1019+
},
1020+
{
1021+
"snapshot-id": 3055729675574597004,
1022+
"statistics-path": "s3://a/b/stats.puffin",
1023+
"file-size-in-bytes": 413,
1024+
"file-footer-size-in-bytes": 42,
1025+
"blob-metadata": [
1026+
{
1027+
"type": "deletion-vector-v1",
1028+
"snapshot-id": 3055729675574597004,
1029+
"sequence-number": 1,
1030+
"fields": [1],
1031+
}
1032+
],
1033+
},
1034+
],
1035+
"snapshot-log": [],
1036+
"metadata-log": [],
1037+
}
1038+
9581039

9591040
@pytest.fixture
9601041
def example_table_metadata_v2() -> Dict[str, Any]:
@@ -966,6 +1047,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]:
9661047
return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES
9671048

9681049

1050+
@pytest.fixture
1051+
def table_metadata_v2_with_statistics() -> Dict[str, Any]:
1052+
return TABLE_METADATA_V2_WITH_STATISTICS
1053+
1054+
9691055
@pytest.fixture(scope="session")
9701056
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
9711057
from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -2199,6 +2285,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s
21992285
)
22002286

22012287

2288+
@pytest.fixture
2289+
def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table:
2290+
table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics)
2291+
return Table(
2292+
identifier=("database", "table"),
2293+
metadata=table_metadata,
2294+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2295+
io=load_file_io(),
2296+
catalog=NoopCatalog("NoopCatalog"),
2297+
)
2298+
2299+
22022300
@pytest.fixture
22032301
def bound_reference_str() -> BoundReference[str]:
22042302
return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None))

0 commit comments

Comments
 (0)