Skip to content

Commit

Permalink
[apache#2927] Improvement(catalog-lakehouse-iceberg): Support more fi…
Browse files Browse the repository at this point in the history
…le formats in using clause when create iceberg tables
  • Loading branch information
caican00 committed Apr 13, 2024
1 parent 08f9cc4 commit 65cd4ff
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.lakehouse.iceberg;

import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec;
Expand Down Expand Up @@ -47,12 +48,32 @@ public class IcebergTable extends BaseTable {
/** The default provider of the table. */
public static final String DEFAULT_ICEBERG_PROVIDER = "iceberg";

/** The supported file formats for Iceberg tables. */
public static final String ICEBERG_PARQUET_FILE_FORMAT = "parquet";

public static final String ICEBERG_ORC_FILE_FORMAT = "orc";
public static final String ICEBERG_AVRO_FILE_FORMAT = "avro";

public static final String ICEBERG_COMMENT_FIELD_NAME = "comment";

private String location;

private IcebergTable() {}

public static Map<String, String> rebuildCreateProperties(Map<String, String> createProperties) {
String provider = createProperties.get(PROP_PROVIDER);
if (ICEBERG_PARQUET_FILE_FORMAT.equalsIgnoreCase(provider)) {
createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_PARQUET_FILE_FORMAT);
} else if (ICEBERG_AVRO_FILE_FORMAT.equalsIgnoreCase(provider)) {
createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_AVRO_FILE_FORMAT);
} else if (ICEBERG_ORC_FILE_FORMAT.equalsIgnoreCase(provider)) {
createProperties.put(DEFAULT_FILE_FORMAT, ICEBERG_ORC_FILE_FORMAT);
} else if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) {
throw new IllegalArgumentException("Unsupported format in USING: " + provider);
}
return createProperties;
}

public CreateTableRequest toCreateTableRequest() {
Schema schema = ConvertUtil.toIcebergSchema(this);
properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
Expand All @@ -62,7 +83,7 @@ public CreateTableRequest toCreateTableRequest() {
.withName(name)
.withLocation(location)
.withSchema(schema)
.setProperties(properties)
.setProperties(rebuildCreateProperties(properties))
.withPartitionSpec(ToIcebergPartitionSpec.toPartitionSpec(schema, partitioning))
.withWriteOrder(ToIcebergSortOrder.toSortOrder(schema, sortOrders));
return builder.build();
Expand Down Expand Up @@ -186,10 +207,6 @@ protected IcebergTable internalBuild() {
if (null != comment) {
icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
}
String provider = icebergTable.properties.get(PROP_PROVIDER);
if (provider != null && !DEFAULT_ICEBERG_PROVIDER.equalsIgnoreCase(provider)) {
throw new IllegalArgumentException("Unsupported format in USING: " + provider);
}
return icebergTable;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.integration.test;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.datastrato.gravitino.Catalog;
Expand Down Expand Up @@ -1051,6 +1052,109 @@ public void testTableDistribution() {
"Iceberg's Distribution Mode.RANGE not support set expressions."));
}

@Test
void testIcebergTablePropertiesWhenCreate() {
// Create table from Gravitino API
Column[] columns = createColumns();

NameIdentifier tableIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
Distribution distribution = Distributions.NONE;

final SortOrder[] sortOrders =
new SortOrder[] {
SortOrders.of(
NamedReference.field(ICEBERG_COL_NAME2),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};

Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())};
Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
Table createdTable =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT));
Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT));

properties.put(DEFAULT_FILE_FORMAT, "iceberg");
createdTable =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertFalse(createdTable.properties().containsKey(DEFAULT_FILE_FORMAT));
loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertFalse(loadTable.properties().containsKey(DEFAULT_FILE_FORMAT));

properties.put(DEFAULT_FILE_FORMAT, "parquet");
createdTable =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals("parquet", createdTable.properties().get(DEFAULT_FILE_FORMAT));
loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals("parquet", loadTable.properties().get(DEFAULT_FILE_FORMAT));

properties.put(DEFAULT_FILE_FORMAT, "orc");
createdTable =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals("orc", createdTable.properties().get(DEFAULT_FILE_FORMAT));
loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals("orc", loadTable.properties().get(DEFAULT_FILE_FORMAT));

properties.put(DEFAULT_FILE_FORMAT, "avro");
createdTable =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals("avro", createdTable.properties().get(DEFAULT_FILE_FORMAT));
loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals("avro", loadTable.properties().get(DEFAULT_FILE_FORMAT));

properties.put(DEFAULT_FILE_FORMAT, "text");
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders));
}

protected static void assertionsTableInfo(
String tableName,
String tableComment,
Expand Down

0 comments on commit 65cd4ff

Please sign in to comment.