Skip to content

Commit d93868c

Browse files
committed
rebase
1 parent 155af26 commit d93868c

File tree

3 files changed

+38
-5
lines changed

3 files changed

+38
-5
lines changed

pyiceberg/partitioning.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
model_validator,
3333
)
3434

35-
from pyiceberg.schema import Schema
35+
from pyiceberg.schema import Schema, index_by_id
3636
from pyiceberg.transforms import (
3737
BucketTransform,
3838
DayTransform,
@@ -56,6 +56,7 @@
5656
TimestampType,
5757
TimestamptzType,
5858
TimeType,
59+
UnknownType,
5960
UUIDType,
6061
)
6162
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros
@@ -222,11 +223,13 @@ def partition_type(self, schema: Schema) -> StructType:
222223
:return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField.
223224
"""
224225
nested_fields = []
226+
schema_ids = index_by_id(schema)
225227
for field in self.fields:
226-
source_type = schema.find_type(field.source_id)
227-
result_type = field.transform.result_type(source_type)
228-
required = schema.find_field(field.source_id).required
229-
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required))
228+
if source_field := schema_ids.get(field.source_id):
229+
result_type = field.transform.result_type(source_field.field_type)
230+
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=source_field.required))
231+
else:
232+
nested_fields.append(NestedField(field.field_id, field.name, UnknownType()))
230233
return StructType(*nested_fields)
231234

232235
def partition_to_path(self, data: Record, schema: Schema) -> str:

tests/integration/test_reads.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,3 +1083,19 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None:
10831083

10841084
scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'")
10851085
assert len(scan.to_arrow()) > 0
1086+
1087+
1088+
@pytest.mark.integration
1089+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
1090+
def test_scan_source_field_missing_in_spec(catalog: Catalog, spark: SparkSession) -> None:
1091+
identifier = "default.test_dropped_field"
1092+
spark.sql(f"DROP TABLE IF EXISTS {identifier}")
1093+
spark.sql(f"CREATE TABLE {identifier} (foo int, bar int, jaz string) USING ICEBERG PARTITIONED BY (foo, bar)")
1094+
spark.sql(
1095+
f"INSERT INTO {identifier} (foo, bar, jaz) VALUES (1, 1, 'dummy data'), (1, 2, 'dummy data again'), (2, 1, 'another partition')"
1096+
)
1097+
spark.sql(f"ALTER TABLE {identifier} DROP PARTITION FIELD foo")
1098+
spark.sql(f"ALTER TABLE {identifier} DROP COLUMN foo")
1099+
1100+
table = catalog.load_table(identifier)
1101+
assert len(list(table.scan().plan_files())) == 3

tests/table/test_partitioning.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
TimestampType,
4848
TimestamptzType,
4949
TimeType,
50+
UnknownType,
5051
UUIDType,
5152
)
5253

@@ -178,6 +179,19 @@ def test_partition_type(table_schema_simple: Schema) -> None:
178179
)
179180

180181

182+
def test_partition_type_missing_source_field(table_schema_simple: Schema) -> None:
183+
spec = PartitionSpec(
184+
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"),
185+
PartitionField(source_id=10, field_id=1001, transform=BucketTransform(num_buckets=25), name="int_bucket"),
186+
spec_id=3,
187+
)
188+
189+
assert spec.partition_type(table_schema_simple) == StructType(
190+
NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False),
191+
NestedField(field_id=1001, name="int_bucket", field_type=UnknownType(), required=False),
192+
)
193+
194+
181195
@pytest.mark.parametrize(
182196
"source_type, value",
183197
[

0 commit comments

Comments
 (0)