From 65cd4ffa197e766ac7cc0bcb5908d38ee43cdd7f Mon Sep 17 00:00:00 2001 From: caican00 Date: Sat, 13 Apr 2024 23:00:35 +0800 Subject: [PATCH] [#2927] Improvement(catalog-lakehouse-iceberg): Support more file formats in using clause when create iceberg tables --- .../lakehouse/iceberg/IcebergTable.java | 27 ++++- .../integration/test/CatalogIcebergIT.java | 104 ++++++++++++++++++ 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 4b909a652d8..4cda34fc32e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; @@ -47,12 +48,32 @@ public class IcebergTable extends BaseTable { /** The default provider of the table. */ public static final String DEFAULT_ICEBERG_PROVIDER = "iceberg"; + /** The supported file formats for Iceberg tables. */ + public static final String ICEBERG_PARQUET_FILE_FORMAT = "parquet"; + + public static final String ICEBERG_ORC_FILE_FORMAT = "orc"; + public static final String ICEBERG_AVRO_FILE_FORMAT = "avro"; + public static final String ICEBERG_COMMENT_FIELD_NAME = "comment"; private String location; private IcebergTable() {} + public static Map rebuildCreateProperties(Map createProperties) { + String provider = createProperties.get(PROP_PROVIDER); + if (ICEBERG_PARQUET_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_PARQUET_FILE_FORMAT); + } else if (ICEBERG_AVRO_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_AVRO_FILE_FORMAT); + } else if (ICEBERG_ORC_FILE_FORMAT.equalsIgnoreCase(provider)) { + createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_ORC_FILE_FORMAT); + } else if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) { + throw new IllegalArgumentException("Unsupported format in USING: " + provider); + } + return createProperties; + } + public CreateTableRequest toCreateTableRequest() { Schema schema = ConvertUtil.toIcebergSchema(this); properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); @@ -62,7 +83,7 @@ public CreateTableRequest toCreateTableRequest() { .withName(name) .withLocation(location) .withSchema(schema) - .setProperties(properties) + .setProperties(rebuildCreateProperties(properties)) .withPartitionSpec(ToIcebergPartitionSpec.toPartitionSpec(schema, partitioning)) .withWriteOrder(ToIcebergSortOrder.toSortOrder(schema, sortOrders)); return builder.build(); @@ -186,10 +207,6 @@ protected IcebergTable internalBuild() { if (null != comment) { icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); } - String provider = icebergTable.properties.get(PROP_PROVIDER); - if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) { - throw new IllegalArgumentException("Unsupported format in USING: " + provider); - } return icebergTable; } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java index cc9512f122d..e6a9f4d9d1a 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergIT.java @@ -4,6 +4,7 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg.integration.test; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.junit.jupiter.api.Assertions.assertThrows; import com.datastrato.gravitino.Catalog; @@ -1051,6 +1052,109 @@ public void testTableDistribution() { "Iceberg's Distribution Mode.RANGE not support set expressions.")); } + @Test + void testIcebergTablePropertiesWhenCreate() { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "iceberg"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "parquet"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("parquet", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "orc"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("orc", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "avro"); + createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals("avro", createdTable.properties().get(DEFAULT_FILE_FORMAT)); + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT)); + + properties.put(DEFAULT_FILE_FORMAT, "text"); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders)); + } + protected static void assertionsTableInfo( String tableName, String tableComment,