Skip to content

Commit

Permalink
Add support for importing from iceberg table definition (#452)
Browse files Browse the repository at this point in the history
Assumes the table definition is in JSON format as defined by https://iceberg.apache.org/spec/#appendix-c-json-serialization.

Eventually it might be nice to add support for ingesting the table definition directly from the catalog (via the pyiceberg sdk), but I did not attempt to tackle that in this first iteration.
  • Loading branch information
paulcichonski authored Oct 10, 2024
1 parent 179a687 commit bd7d8b4
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Support for import from Iceberg table definitions.

## [0.10.13] - 2024-09-20

### Added
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ models:

╭─ Options ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ * --format [sql|avro|dbt|glue|jsonschema|bigquery|odcs The format of the source file. │
│ |unity|spark] [default: None]
│ |unity|spark|iceberg] [default: None]
[required]
│ --source TEXT The path to the file or Glue Database that │
│ should be imported. │
Expand Down Expand Up @@ -950,6 +950,8 @@ models:
│ file (repeat for multiple table names, leave │
│ empty for all tables in the file). │
[default: None]
│ --iceberg-table TEXT Table name to assign to the model created │
│ from the Iceberg schema. [default: None]
│ --help Show this message and exit. │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```
Expand All @@ -975,6 +977,7 @@ Available import options:
| `spark` | Import from Spark StructTypes ||
| `dbml` | Import from DBML models ||
| `protobuf` | Import from Protobuf schemas | TBD |
| `iceberg` | Import from an Iceberg JSON Schema Definition | partial |
| Missing something? | Please create an issue on GitHub | TBD |


Expand Down Expand Up @@ -1092,6 +1095,15 @@ datacontract import --format dbml --source <file_path> --dbml-table <table_name_
datacontract import --format dbml --source <file_path> --dbml-table <table_name_1> --dbml-schema <schema_1>
```

#### Iceberg

Importing from an [Iceberg Table Json Schema Definition](https://iceberg.apache.org/spec/#appendix-c-json-serialization). Specify location of json files using the `source` parameter.

Examples:

```bash
datacontract import --format iceberg --source ./tests/fixtures/iceberg/simple_schema.json --iceberg-table test-table
```

### breaking

Expand Down
5 changes: 5 additions & 0 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def import_(
help="List of table names to import from the DBML file (repeat for multiple table names, leave empty for all tables in the file)."
),
] = None,
iceberg_table: Annotated[
Optional[str],
typer.Option(help="Table name to assign to the model created from the Iceberg schema."),
] = None,
):
"""
Create a data contract from the given source location. Prints to stdout.
Expand All @@ -259,6 +263,7 @@ def import_(
dbt_model=dbt_model,
dbml_schema=dbml_schema,
dbml_table=dbml_table,
iceberg_table=iceberg_table,
)
console.print(result.to_yaml())

Expand Down
162 changes: 162 additions & 0 deletions datacontract/imports/iceberg_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import Dict, Any

from datacontract.imports.importer import Importer
from datacontract.model.data_contract_specification import DataContractSpecification, Model, Field

from pyiceberg.schema import Schema
from pyiceberg import types as iceberg_types
from pydantic import ValidationError

from datacontract.model.exceptions import DataContractException


class IcebergImporter(Importer):
def import_source(
self, data_contract_specification: DataContractSpecification, source: str, import_args: dict
) -> DataContractSpecification:
schema = load_and_validate_iceberg_schema(source)
return import_iceberg(
data_contract_specification,
schema,
import_args.get("iceberg_table"),
)


def load_and_validate_iceberg_schema(source: str) -> Schema:
with open(source, "r") as file:
try:
return Schema.model_validate_json(file.read())
except ValidationError as e:
raise DataContractException(
type="schema",
name="Parse iceberg schema",
reason=f"Failed to validate iceberg schema from {source}: {e}",
engine="datacontract",
)


def import_iceberg(
data_contract_specification: DataContractSpecification, schema: Schema, table_name: str
) -> DataContractSpecification:
if data_contract_specification.models is None:
data_contract_specification.models = {}

model = Model(type="table", title=table_name)

for field in schema.fields:
model.fields[field.name] = _field_from_nested_field(field)

data_contract_specification.models[table_name] = model
return data_contract_specification


def _field_from_nested_field(nested_field: iceberg_types.NestedField) -> Field:
"""
Converts an Iceberg NestedField into a Field object for the data contract.
Args:
nested_field: The Iceberg NestedField to convert.
Returns:
Field: The generated Field object.
"""
field = Field(
title=nested_field.name,
required=nested_field.required,
config=build_field_config(nested_field),
)

if nested_field.doc is not None:
field.description = nested_field.doc

return _type_from_iceberg_type(field, nested_field.field_type)


def _type_from_iceberg_type(field: Field, iceberg_type: iceberg_types.IcebergType) -> Field:
"""
Maps Iceberg data types to the Data Contract type system and updates the field.
Args:
field: The Field object to update.
iceberg_type: The Iceberg data type to map.
Returns:
Field: The updated Field object.
"""
field.type = _data_type_from_iceberg(iceberg_type)

if field.type == "array":
field.items = _type_from_iceberg_type(Field(required=iceberg_type.element_required), iceberg_type.element_type)

elif field.type == "map":
field.keys = _type_from_iceberg_type(Field(required=True), iceberg_type.key_type)
field.values = _type_from_iceberg_type(Field(required=iceberg_type.value_required), iceberg_type.value_type)

elif field.type == "object":
field.fields = {nf.name: _field_from_nested_field(nf) for nf in iceberg_type.fields}

return field


def build_field_config(iceberg_field: iceberg_types.NestedField) -> Dict[str, Any]:
config = {}

if iceberg_field.field_id > 0:
config["icebergFieldId"] = iceberg_field.field_id

if iceberg_field.initial_default is not None:
config["icebergInitialDefault"] = iceberg_field.initial_default

if iceberg_field.write_default is not None:
config["icebergWriteDefault"] = iceberg_field.write_default

return config


def _data_type_from_iceberg(type: iceberg_types.IcebergType) -> str:
"""
Convert an Iceberg field type to a datacontract field type
Args:
type: The Iceberg field type
Returns:
str: The datacontract field type
"""
if isinstance(type, iceberg_types.BooleanType):
return "boolean"
if isinstance(type, iceberg_types.IntegerType):
return "integer"
if isinstance(type, iceberg_types.LongType):
return "long"
if isinstance(type, iceberg_types.FloatType):
return "float"
if isinstance(type, iceberg_types.DoubleType):
return "double"
if isinstance(type, iceberg_types.DecimalType):
return "decimal"
if isinstance(type, iceberg_types.DateType):
return "date"
if isinstance(type, iceberg_types.TimeType):
# there isn't a great mapping for the iceberg type "time", just map to string for now
return "string"
if isinstance(type, iceberg_types.TimestampType):
return "timestamp_ntz"
if isinstance(type, iceberg_types.TimestamptzType):
return "timestamp_tz"
if isinstance(type, iceberg_types.StringType):
return "string"
if isinstance(type, iceberg_types.UUIDType):
return "string"
if isinstance(type, iceberg_types.BinaryType):
return "bytes"
if isinstance(type, iceberg_types.FixedType):
return "bytes"
if isinstance(type, iceberg_types.MapType):
return "map"
if isinstance(type, iceberg_types.ListType):
return "array"
if isinstance(type, iceberg_types.StructType):
return "object"

raise ValueError(f"Unknown Iceberg type: {type}")
1 change: 1 addition & 0 deletions datacontract/imports/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ImportFormat(str, Enum):
odcs = "odcs"
unity = "unity"
spark = "spark"
iceberg = "iceberg"

@classmethod
def get_supported_formats(cls):
Expand Down
5 changes: 5 additions & 0 deletions datacontract/imports/importer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,8 @@ def load_module_class(module_path, class_name):
module_path="datacontract.imports.dbml_importer",
class_name="DBMLImporter",
)
importer_factory.register_lazy_importer(
name=ImportFormat.iceberg,
module_path="datacontract.imports.iceberg_importer",
class_name="IcebergImporter",
)
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ databricks = [
"soda-core-spark[databricks]>=3.3.1,<3.4.0"
]

iceberg = [
"pyiceberg==0.7.1"
]

kafka = [
"datacontract-cli[avro]",
"soda-core-spark-df>=3.3.1,<3.4.0"
Expand Down Expand Up @@ -91,7 +95,7 @@ dbml = [
]

all = [
"datacontract-cli[kafka,bigquery,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml]"
"datacontract-cli[kafka,bigquery,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg]"
]

dev = [
Expand Down
3 changes: 3 additions & 0 deletions tests/fixtures/iceberg/invalid_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"fields": "not a list"
}
107 changes: 107 additions & 0 deletions tests/fixtures/iceberg/nested_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"type": "struct",
"fields": [
{
"id": 1,
"name": "foo",
"type": "string",
"required": false
},
{
"id": 2,
"name": "bar",
"type": "int",
"required": true
},
{
"id": 3,
"name": "baz",
"type": "boolean",
"required": false
},
{
"id": 4,
"name": "qux",
"type": {
"type": "list",
"element-id": 5,
"element": "string",
"element-required": true
},
"required": true
},
{
"id": 6,
"name": "quux",
"type": {
"type": "map",
"key-id": 7,
"key": "string",
"value-id": 8,
"value": {
"type": "map",
"key-id": 9,
"key": "string",
"value-id": 10,
"value": "int",
"value-required": true
},
"value-required": true
},
"required": true
},
{
"id": 11,
"name": "location",
"type": {
"type": "list",
"element-id": 12,
"element": {
"type": "struct",
"fields": [
{
"id": 13,
"name": "latitude",
"type": "float",
"required": false
},
{
"id": 14,
"name": "longitude",
"type": "float",
"required": false
}
]
},
"element-required": true
},
"required": true
},
{
"id": 15,
"name": "person",
"type": {
"type": "struct",
"fields": [
{
"id": 16,
"name": "name",
"type": "string",
"required": false
},
{
"id": 17,
"name": "age",
"type": "int",
"required": true
}
]
},
"required": false
}
],
"schema-id": 1,
"identifier-field-ids": [
2
]
}
Loading

0 comments on commit bd7d8b4

Please sign in to comment.