|
58 | 58 | NoSuchTableError, |
59 | 59 | TableAlreadyExistsError, |
60 | 60 | ) |
61 | | -from pyiceberg.io import load_file_io |
62 | 61 | from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec |
63 | 62 | from pyiceberg.schema import Schema, SchemaVisitor, visit |
64 | 63 | from pyiceberg.serializers import FromInputFile |
|
67 | 66 | CommitTableResponse, |
68 | 67 | PropertyUtil, |
69 | 68 | Table, |
70 | | - update_table_metadata, |
71 | 69 | ) |
72 | 70 | from pyiceberg.table.metadata import TableMetadata |
73 | 71 | from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder |
@@ -321,7 +319,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table: |
321 | 319 | ) |
322 | 320 | metadata_location = properties[METADATA_LOCATION] |
323 | 321 |
|
324 | | - io = load_file_io(properties=self.properties, location=metadata_location) |
| 322 | + io = self._load_file_io(location=metadata_location) |
325 | 323 | file = io.new_input(metadata_location) |
326 | 324 | metadata = FromInputFile.table_metadata(file) |
327 | 325 | return Table( |
@@ -439,71 +437,64 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons |
439 | 437 | ) |
440 | 438 | database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) |
441 | 439 |
|
| 440 | + current_glue_table: Optional[TableTypeDef] |
| 441 | + glue_table_version_id: Optional[str] |
| 442 | + current_table: Optional[Table] |
442 | 443 | try: |
443 | 444 | current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) |
444 | | - # Update the table |
445 | 445 | glue_table_version_id = current_glue_table.get("VersionId") |
| 446 | + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) |
| 447 | + except NoSuchTableError: |
| 448 | + current_glue_table = None |
| 449 | + glue_table_version_id = None |
| 450 | + current_table = None |
| 451 | + |
| 452 | + updated_staged_table = self._update_and_stage_table(current_table, table_request) |
| 453 | + if current_table and updated_staged_table.metadata == current_table.metadata: |
| 454 | + # no changes, do nothing |
| 455 | + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) |
| 456 | + self._write_metadata( |
| 457 | + metadata=updated_staged_table.metadata, |
| 458 | + io=updated_staged_table.io, |
| 459 | + metadata_path=updated_staged_table.metadata_location, |
| 460 | + ) |
| 461 | + |
| 462 | + if current_table: |
| 463 | + # table exists, update the table |
446 | 464 | if not glue_table_version_id: |
447 | 465 | raise CommitFailedException( |
448 | 466 | f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" |
449 | 467 | ) |
450 | | - current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) |
451 | | - base_metadata = current_table.metadata |
452 | | - |
453 | | - # Validate the update requirements |
454 | | - for requirement in table_request.requirements: |
455 | | - requirement.validate(base_metadata) |
456 | | - |
457 | | - updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates) |
458 | | - if updated_metadata == base_metadata: |
459 | | - # no changes, do nothing |
460 | | - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) |
461 | | - |
462 | | - # write new metadata |
463 | | - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 |
464 | | - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) |
465 | | - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) |
466 | 468 |
|
| 469 | + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent |
| 470 | + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking |
467 | 471 | update_table_input = _construct_table_input( |
468 | 472 | table_name=table_name, |
469 | | - metadata_location=new_metadata_location, |
470 | | - properties=current_table.properties, |
471 | | - metadata=updated_metadata, |
| 473 | + metadata_location=updated_staged_table.metadata_location, |
| 474 | + properties=updated_staged_table.properties, |
| 475 | + metadata=updated_staged_table.metadata, |
472 | 476 | glue_table=current_glue_table, |
473 | 477 | prev_metadata_location=current_table.metadata_location, |
474 | 478 | ) |
475 | | - |
476 | | - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent |
477 | | - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking |
478 | 479 | self._update_glue_table( |
479 | 480 | database_name=database_name, |
480 | 481 | table_name=table_name, |
481 | 482 | table_input=update_table_input, |
482 | 483 | version_id=glue_table_version_id, |
483 | 484 | ) |
484 | | - |
485 | | - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) |
486 | | - except NoSuchTableError: |
487 | | - # Create the table |
488 | | - updated_metadata = update_table_metadata( |
489 | | - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True |
490 | | - ) |
491 | | - new_metadata_version = 0 |
492 | | - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) |
493 | | - self._write_metadata( |
494 | | - updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location |
495 | | - ) |
496 | | - |
| 485 | + else: |
| 486 | + # table does not exist, create the table |
497 | 487 | create_table_input = _construct_table_input( |
498 | 488 | table_name=table_name, |
499 | | - metadata_location=new_metadata_location, |
500 | | - properties=updated_metadata.properties, |
501 | | - metadata=updated_metadata, |
| 489 | + metadata_location=updated_staged_table.metadata_location, |
| 490 | + properties=updated_staged_table.properties, |
| 491 | + metadata=updated_staged_table.metadata, |
502 | 492 | ) |
503 | | - |
504 | 493 | self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) |
505 | 494 |
|
506 | | - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) |
| 495 | + return CommitTableResponse( |
| 496 | + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location |
| 497 | + ) |
507 | 498 |
|
508 | 499 | def load_table(self, identifier: Union[str, Identifier]) -> Table: |
509 | 500 | """Load the table's metadata and returns the table instance. |
|
0 commit comments