Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ catalog:
s3.secret-access-key: password
```

In case of Hive 2.x:

```yaml
catalog:
default:
uri: thrift://localhost:9083
hive.hive2-compatible: true
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
```

## Glue Catalog

Your AWS credentials can be passed directly through the Python API.
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
PropertyUtil,
Table,
update_table_metadata,
)
Expand Down Expand Up @@ -344,7 +345,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
self.glue.update_table(
DatabaseName=database_name,
TableInput=table_input,
SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
VersionId=version_id,
)
except self.glue.exceptions.EntityNotFoundException as e:
Expand Down
29 changes: 24 additions & 5 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -107,6 +107,10 @@
COMMENT = "comment"
OWNER = "owner"

# If set to true, HiveCatalog will operate in Hive2 compatibility mode
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False


class _HiveClient:
"""Helper class to nicely open and close the transport."""
Expand Down Expand Up @@ -136,10 +140,15 @@ def __exit__(
self._transport.close()


def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) -> StorageDescriptor:
def _construct_hive_storage_descriptor(
schema: Schema, location: Optional[str], hive2_compatible: bool = False
) -> StorageDescriptor:
ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
return StorageDescriptor(
[FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter()), field.doc) for field in schema.fields],
[
FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc)
for field in schema.fields
],
location,
"org.apache.hadoop.mapred.FileInputFormat",
"org.apache.hadoop.mapred.FileOutputFormat",
Expand Down Expand Up @@ -184,7 +193,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD
DateType: "date",
TimeType: "string",
TimestampType: "timestamp",
TimestamptzType: "timestamp",
TimestamptzType: "timestamp with local time zone",
StringType: "string",
UUIDType: "string",
BinaryType: "binary",
Expand All @@ -193,6 +202,11 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD


class SchemaToHiveConverter(SchemaVisitor[str]):
hive2_compatible: bool

def __init__(self, hive2_compatible: bool):
self.hive2_compatible = hive2_compatible

def schema(self, schema: Schema, struct_result: str) -> str:
return struct_result

Expand All @@ -212,6 +226,9 @@ def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
def primitive(self, primitive: PrimitiveType) -> str:
if isinstance(primitive, DecimalType):
return f"decimal({primitive.precision},{primitive.scale})"
elif self.hive2_compatible and isinstance(primitive, TimestamptzType):
# Hive2 doesn't support timestamp with local time zone
return "timestamp"
else:
return HIVE_PRIMITIVE_TYPES[type(primitive)]

Expand Down Expand Up @@ -300,7 +317,9 @@ def create_table(
owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(),
createTime=current_time_millis // 1000,
lastAccessTime=current_time_millis // 1000,
sd=_construct_hive_storage_descriptor(schema, location),
sd=_construct_hive_storage_descriptor(
schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT)
),
tableType=EXTERNAL_TABLE,
parameters=_construct_parameters(metadata_location),
)
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt
else:
return default

@staticmethod
def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool:
if value := properties.get(property_name):
return value.lower() == "true"
return default


class Transaction:
_table: Table
Expand Down
13 changes: 11 additions & 2 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,14 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None:
catalog.create_table("table", schema=table_schema_simple)


@pytest.mark.parametrize("hive2_compatible", [True, False])
@patch("time.time", MagicMock(return_value=12345))
def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None:
def test_create_table(
table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable, hive2_compatible: bool
) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
if hive2_compatible:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"})

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
Expand Down Expand Up @@ -229,7 +234,11 @@ def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDa
FieldSchema(name='date', type='date', comment=None),
FieldSchema(name='time', type='string', comment=None),
FieldSchema(name='timestamp', type='timestamp', comment=None),
FieldSchema(name='timestamptz', type='timestamp', comment=None),
FieldSchema(
name='timestamptz',
type='timestamp' if hive2_compatible else 'timestamp with local time zone',
comment=None,
),
FieldSchema(name='string', type='string', comment=None),
FieldSchema(name='uuid', type='string', comment=None),
FieldSchema(name='fixed', type='binary', comment=None),
Expand Down
21 changes: 21 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2006,6 +2006,20 @@ def session_catalog() -> Catalog:
)


@pytest.fixture(scope="session")
def session_catalog_hive() -> Catalog:
return load_catalog(
"local",
**{
"type": "hive",
"uri": "http://localhost:9083",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)


@pytest.fixture(scope="session")
def spark() -> "SparkSession":
import importlib.metadata
Expand Down Expand Up @@ -2037,6 +2051,13 @@ def spark() -> "SparkSession":
.config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000")
.config("spark.sql.catalog.integration.s3.path-style-access", "true")
.config("spark.sql.defaultCatalog", "integration")
.config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive.type", "hive")
.config("spark.sql.catalog.hive.uri", "http://localhost:9083")
.config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/")
.config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000")
.config("spark.sql.catalog.hive.s3.path-style-access", "true")
.getOrCreate()
)

Expand Down
20 changes: 20 additions & 0 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pytest_mock.plugin import MockerFixture

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -832,3 +833,22 @@ def get_metadata_entries_count(identifier: str) -> int:
tbl.transaction().set_properties({"test": "2"}).commit_transaction()
tbl.append(arrow_table_with_null)
assert get_metadata_entries_count(identifier) == 4


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_hive_catalog_storage_descriptor(
session_catalog_hive: HiveCatalog,
pa_schema: pa.Schema,
arrow_table_with_null: pa.Table,
spark: SparkSession,
format_version: int,
) -> None:
tbl = _create_table(
session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null]
)

# check if pyiceberg can read the table
assert len(tbl.scan().to_arrow()) == 3
# check if spark can read the table
assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3