|
33 | 33 | NoSuchNamespaceError, |
34 | 34 | NoSuchTableError, |
35 | 35 | TableAlreadyExistsError, |
| 36 | + ValidationError, |
36 | 37 | ) |
37 | 38 | from pyiceberg.io import WAREHOUSE |
38 | 39 | from pyiceberg.partitioning import PartitionField, PartitionSpec |
@@ -601,3 +602,56 @@ def test_register_table_existing(test_catalog: Catalog, table_schema_nested: Sch |
601 | 602 | # Assert that registering the table again raises TableAlreadyExistsError |
602 | 603 | with pytest.raises(TableAlreadyExistsError): |
603 | 604 | test_catalog.register_table(identifier, metadata_location=table.metadata_location) |
| 605 | + |
| 606 | + |
| 607 | +@pytest.mark.integration |
| 608 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 609 | +def test_incompatible_partitioned_schema_evolution( |
| 610 | + test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str |
| 611 | +) -> None: |
| 612 | + if isinstance(test_catalog, HiveCatalog): |
| 613 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 614 | + |
| 615 | + identifier = (database_name, table_name) |
| 616 | + test_catalog.create_namespace(database_name) |
| 617 | + table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec) |
| 618 | + assert test_catalog.table_exists(identifier) |
| 619 | + |
| 620 | + with pytest.raises(ValidationError): |
| 621 | + with table.update_schema() as update: |
| 622 | + update.delete_column("VendorID") |
| 623 | + |
| 624 | + # Assert column was not dropped |
| 625 | + assert "VendorID" in table.schema().column_names |
| 626 | + |
| 627 | + with table.transaction() as transaction: |
| 628 | + with transaction.update_spec() as spec_update: |
| 629 | + spec_update.remove_field("VendorID") |
| 630 | + |
| 631 | + with transaction.update_schema() as schema_update: |
| 632 | + schema_update.delete_column("VendorID") |
| 633 | + |
| 634 | + assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1) |
| 635 | + assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False)) |
| 636 | + |
| 637 | + |
| 638 | +@pytest.mark.integration |
| 639 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 640 | +def test_incompatible_sorted_schema_evolution( |
| 641 | + test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str |
| 642 | +) -> None: |
| 643 | + if isinstance(test_catalog, HiveCatalog): |
| 644 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 645 | + |
| 646 | + identifier = (database_name, table_name) |
| 647 | + test_catalog.create_namespace(database_name) |
| 648 | + table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order) |
| 649 | + assert test_catalog.table_exists(identifier) |
| 650 | + |
| 651 | + with pytest.raises(ValidationError): |
| 652 | + with table.update_schema() as update: |
| 653 | + update.delete_column("VendorID") |
| 654 | + |
| 655 | + assert table.schema() == Schema( |
| 656 | + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False) |
| 657 | + ) |
0 commit comments