Skip to content

Commit f0bb203

Browse files
HonahXsungwy
andauthored
[Bug Fix] cast None current-snapshot-id as -1 for Backwards Compatibility (#473) (#557)
Backport to 0.6.1 Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
1 parent 813adbe commit f0bb203

File tree

7 files changed

+102
-12
lines changed

7 files changed

+102
-12
lines changed

mkdocs/docs/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,7 @@ catalog:
269269
# Concurrency
270270

271271
PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details.
272+
273+
# Backward Compatibility
274+
275+
Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue

pyiceberg/serializers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pyiceberg.io import InputFile, InputStream, OutputFile
2525
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
2626
from pyiceberg.typedef import UTF8
27+
from pyiceberg.utils.config import Config
2728

2829
GZIP = "gzip"
2930

@@ -127,6 +128,9 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite:
127128
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
128129
"""
129130
with output_file.create(overwrite=overwrite) as output_stream:
130-
json_bytes = metadata.model_dump_json().encode(UTF8)
131+
# We need to serialize None values, in order to dump `None` current-snapshot-id as `-1`
132+
exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True
133+
134+
json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
131135
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
132136
output_stream.write(json_bytes)

pyiceberg/table/metadata.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
Union,
2929
)
3030

31-
from pydantic import Field, model_validator
31+
from pydantic import Field, field_serializer, model_validator
3232
from pydantic import ValidationError as PydanticValidationError
3333
from typing_extensions import Annotated
3434

@@ -49,6 +49,7 @@
4949
IcebergRootModel,
5050
Properties,
5151
)
52+
from pyiceberg.utils.config import Config
5253
from pyiceberg.utils.datetime import datetime_to_millis
5354

5455
CURRENT_SNAPSHOT_ID = "current-snapshot-id"
@@ -226,6 +227,12 @@ def schema_by_id(self, schema_id: int) -> Optional[Schema]:
226227
"""Get the schema by schema_id."""
227228
return next((schema for schema in self.schemas if schema.schema_id == schema_id), None)
228229

230+
@field_serializer('current_snapshot_id')
231+
def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> Optional[int]:
232+
if current_snapshot_id is None and Config().get_bool("legacy-current-snapshot-id"):
233+
return -1
234+
return current_snapshot_id
235+
229236

230237
class TableMetadataV1(TableMetadataCommonFields, IcebergBaseModel):
231238
"""Represents version 1 of the Table Metadata.

pyiceberg/utils/concurrent.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,4 @@ def get_or_create() -> Executor:
3737
@staticmethod
3838
def max_workers() -> Optional[int]:
3939
"""Return the max number of workers configured."""
40-
config = Config()
41-
val = config.config.get("max-workers")
42-
43-
if val is None:
44-
return None
45-
46-
try:
47-
return int(val) # type: ignore
48-
except ValueError as err:
49-
raise ValueError(f"Max workers should be an integer or left unset. Current value: {val}") from err
40+
return Config().get_int("max-workers")

pyiceberg/utils/config.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
import logging
1818
import os
19+
from distutils.util import strtobool
1920
from typing import List, Optional
2021

2122
import strictyaml
@@ -154,3 +155,19 @@ def get_catalog_config(self, catalog_name: str) -> Optional[RecursiveDict]:
154155
assert isinstance(catalog_conf, dict), f"Configuration path catalogs.{catalog_name_lower} needs to be an object"
155156
return catalog_conf
156157
return None
158+
159+
def get_int(self, key: str) -> Optional[int]:
160+
if (val := self.config.get(key)) is not None:
161+
try:
162+
return int(val) # type: ignore
163+
except ValueError as err:
164+
raise ValueError(f"{key} should be an integer or left unset. Current value: {val}") from err
165+
return None
166+
167+
def get_bool(self, key: str) -> Optional[bool]:
168+
if (val := self.config.get(key)) is not None:
169+
try:
170+
return strtobool(val) # type: ignore
171+
except ValueError as err:
172+
raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err
173+
return None

tests/test_serializers.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import json
19+
import os
20+
import uuid
21+
from typing import Any, Dict
22+
23+
import pytest
24+
from pytest_mock import MockFixture
25+
26+
from pyiceberg.serializers import ToOutputFile
27+
from pyiceberg.table import StaticTable
28+
from pyiceberg.table.metadata import TableMetadataV1
29+
30+
31+
def test_legacy_current_snapshot_id(
32+
mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, example_table_metadata_no_snapshot_v1: Dict[str, Any]
33+
) -> None:
34+
from pyiceberg.io.pyarrow import PyArrowFileIO
35+
36+
metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json")
37+
metadata = TableMetadataV1(**example_table_metadata_no_snapshot_v1)
38+
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
39+
static_table = StaticTable.from_metadata(metadata_location)
40+
assert static_table.metadata.current_snapshot_id is None
41+
42+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
43+
44+
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
45+
with PyArrowFileIO().new_input(location=metadata_location).open() as input_stream:
46+
metadata_json_bytes = input_stream.read()
47+
assert json.loads(metadata_json_bytes)['current-snapshot-id'] == -1
48+
backwards_compatible_static_table = StaticTable.from_metadata(metadata_location)
49+
assert backwards_compatible_static_table.metadata.current_snapshot_id is None
50+
assert backwards_compatible_static_table.metadata == static_table.metadata

tests/utils/test_config.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,20 @@ def test_merge_config() -> None:
7676
rhs: RecursiveDict = {"common_key": "xyz789"}
7777
result = merge_config(lhs, rhs)
7878
assert result["common_key"] == rhs["common_key"]
79+
80+
81+
def test_from_configuration_files_get_typed_value(tmp_path_factory: pytest.TempPathFactory) -> None:
82+
config_path = str(tmp_path_factory.mktemp("config"))
83+
with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file:
84+
yaml_str = as_document({"max-workers": "4", "legacy-current-snapshot-id": "True"}).as_yaml()
85+
file.write(yaml_str)
86+
87+
os.environ["PYICEBERG_HOME"] = config_path
88+
with pytest.raises(ValueError):
89+
Config().get_bool("max-workers")
90+
91+
with pytest.raises(ValueError):
92+
Config().get_int("legacy-current-snapshot-id")
93+
94+
assert Config().get_bool("legacy-current-snapshot-id")
95+
assert Config().get_int("max-workers") == 4

0 commit comments

Comments
 (0)