Skip to content

Commit 67e8773

Browse files
authored
Use the correct spec when rewiting existing manifests (apache#1157)
* Use the correct spec when rewiting existing manifests Fixes apache#1108 * Rename test
1 parent 4d23c55 commit 67e8773

File tree

2 files changed

+58
-19
lines changed

2 files changed

+58
-19
lines changed

pyiceberg/table/update/snapshot.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
545545
if any(entry.data_file not in found_deleted_data_files for entry in entries):
546546
with write_manifest(
547547
format_version=self._transaction.table_metadata.format_version,
548-
spec=self._transaction.table_metadata.spec(),
548+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
549549
schema=self._transaction.table_metadata.schema(),
550550
output_file=self.new_manifest_output(),
551551
snapshot_id=self._snapshot_id,

tests/integration/test_writes/test_writes.py

+57-18
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
import math
1919
import os
2020
import time
21-
from datetime import date, datetime
21+
from datetime import date, datetime, timedelta
2222
from pathlib import Path
2323
from typing import Any, Dict
2424
from urllib.parse import urlparse
2525

2626
import numpy as np
2727
import pandas as pd
2828
import pyarrow as pa
29+
import pyarrow.compute as pc
2930
import pyarrow.parquet as pq
3031
import pytest
3132
import pytz
@@ -39,12 +40,12 @@
3940
from pyiceberg.catalog.rest import RestCatalog
4041
from pyiceberg.catalog.sql import SqlCatalog
4142
from pyiceberg.exceptions import NoSuchTableError
42-
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
43+
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
4344
from pyiceberg.io.pyarrow import _dataframe_to_data_files
4445
from pyiceberg.partitioning import PartitionField, PartitionSpec
4546
from pyiceberg.schema import Schema
4647
from pyiceberg.table import TableProperties
47-
from pyiceberg.transforms import DayTransform, IdentityTransform
48+
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
4849
from pyiceberg.types import (
4950
DateType,
5051
DoubleType,
@@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
13441345

13451346

13461347
@pytest.mark.integration
1347-
def test_delete_threshold() -> None:
1348-
catalog = load_catalog(
1349-
"local",
1350-
**{
1351-
"type": "rest",
1352-
"uri": "http://localhost:8181",
1353-
"s3.endpoint": "http://localhost:9000",
1354-
"s3.access-key-id": "admin",
1355-
"s3.secret-access-key": "password",
1356-
},
1357-
)
1358-
1348+
def test_delete_threshold(session_catalog: Catalog) -> None:
13591349
schema = Schema(
13601350
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
13611351
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
@@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None:
13651355
partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))
13661356

13671357
try:
1368-
catalog.drop_table(
1358+
session_catalog.drop_table(
13691359
identifier="default.scores",
13701360
)
13711361
except NoSuchTableError:
13721362
pass
13731363

1374-
catalog.create_table(
1364+
session_catalog.create_table(
13751365
identifier="default.scores",
13761366
schema=schema,
13771367
partition_spec=partition_spec,
@@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None:
13951385
# Create the dataframe
13961386
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})
13971387

1398-
iceberg_table = catalog.load_table("default.scores")
1388+
iceberg_table = session_catalog.load_table("default.scores")
13991389

14001390
# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
14011391
arrow_schema = iceberg_table.schema().as_arrow()
@@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None:
14091399
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
14101400
iceberg_table.delete(delete_condition)
14111401
assert len(iceberg_table.scan().to_arrow()) == lower_before
1402+
1403+
1404+
@pytest.mark.integration
1405+
def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> None:
1406+
np.random.seed(876)
1407+
N = 1440
1408+
d = {
1409+
"timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + timedelta(minutes=i) for i in range(N)]),
1410+
"category": pa.array([np.random.choice(["A", "B", "C"]) for _ in range(N)]),
1411+
"value": pa.array(np.random.normal(size=N)),
1412+
}
1413+
data = pa.Table.from_pydict(d)
1414+
1415+
try:
1416+
session_catalog.drop_table(
1417+
identifier="default.test_error_table",
1418+
)
1419+
except NoSuchTableError:
1420+
pass
1421+
1422+
table = session_catalog.create_table(
1423+
"default.test_error_table",
1424+
schema=data.schema,
1425+
)
1426+
1427+
with table.update_spec() as update:
1428+
update.add_field("timestamp", transform=HourTransform())
1429+
1430+
table.append(data)
1431+
1432+
with table.update_spec() as update:
1433+
update.add_field("category", transform=IdentityTransform())
1434+
1435+
data_ = data.filter(
1436+
(pc.field("category") == "A")
1437+
& (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
1438+
& (pc.field("timestamp") < datetime(2023, 1, 1, 1))
1439+
)
1440+
1441+
table.overwrite(
1442+
df=data_,
1443+
overwrite_filter=And(
1444+
And(
1445+
GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 0).isoformat()),
1446+
LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
1447+
),
1448+
EqualTo("category", "A"),
1449+
),
1450+
)

0 commit comments

Comments
 (0)