Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and truncate functions in Gravitino SortOrder #2928

Merged
merged 28 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
72502e0
[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and…
caican00 Apr 13, 2024
4937572
Merge branch 'main' into iceberg-properties
caican00 Apr 13, 2024
3e52bc0
[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and…
caican00 Apr 13, 2024
b6d55cc
Merge remote-tracking branch 'upstream/main' into iceberg-properties
caican00 Apr 13, 2024
6277724
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 13, 2024
65cd4ff
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
6e8af10
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
b9bc926
update
caican00 Apr 15, 2024
2d021b9
update
caican00 Apr 15, 2024
855eaf2
Merge branch 'main' into iceberg-properties
caican00 Apr 15, 2024
fc09aef
update
caican00 Apr 15, 2024
99a339a
update
caican00 Apr 15, 2024
69c7321
update
caican00 Apr 15, 2024
d51903a
update
caican00 Apr 15, 2024
3b7d2ea
Merge remote-tracking branch 'origin/main' into iceberg-properties
caican00 Apr 15, 2024
b16d4ee
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 15, 2024
17990ac
Merge remote-tracking branch 'upstream_dev/iceberg-create-properties'…
caican00 Apr 15, 2024
e2c2a02
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 15, 2024
845fe0e
update
caican00 Apr 15, 2024
040870d
Merge branch 'main' into iceberg-properties
caican00 Apr 18, 2024
6b7c28a
Merge branch 'main' into iceberg-properties
caican00 Apr 18, 2024
60d11b2
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 19, 2024
0538bda
update
caican00 Apr 19, 2024
633341c
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 19, 2024
f7f644d
Merge branch 'main' into iceberg-properties
caican00 Apr 19, 2024
c94eb48
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 May 17, 2024
98f85ce
update
caican00 May 22, 2024
28dfe4b
Merge remote-tracking branch 'origin/main' into iceberg-properties
caican00 May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.rel.expressions;

import com.datastrato.gravitino.annotation.Evolving;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import java.util.Arrays;
import java.util.Objects;

Expand Down Expand Up @@ -73,7 +74,17 @@ public String toString() {
if (arguments.length == 0) {
return functionName + "()";
}
return functionName + "(" + String.join(", ", Arrays.toString(arguments)) + ")";
String[] functionArguments =
caican00 marked this conversation as resolved.
Show resolved Hide resolved
Arrays.stream(this.arguments)
.map(
expression -> {
if (expression instanceof Literals.LiteralImpl) {
return ((Literals.LiteralImpl<?>) expression).value().toString();
}
return expression.toString();
})
.toArray(String[]::new);
return functionName + "(" + String.join(", ", functionArguments) + ")";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
Expand Down Expand Up @@ -38,13 +39,13 @@ public SortOrder field(String sourceName, int id, SortDirection direction, NullO
@Override
public SortOrder bucket(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("bucket", id, direction, nullOrder);
return functionSortOrder("bucket", width, id, direction, nullOrder);
}

@Override
public SortOrder truncate(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("truncate", id, direction, nullOrder);
return functionSortOrder("truncate", width, id, direction, nullOrder);
}

@Override
Expand Down Expand Up @@ -80,6 +81,15 @@ private SortOrder functionSortOrder(
toGravitino(nullOrder));
}

private SortOrder functionSortOrder(
String name, int width, int id, SortDirection direction, NullOrder nullOrder) {
return SortOrders.of(
FunctionExpression.of(
name, Literals.integerLiteral(width), NamedReference.field(idToName.get(id))),
toGravitino(direction),
toGravitino(nullOrder));
}

private com.datastrato.gravitino.rel.expressions.sorts.SortDirection toGravitino(
SortDirection direction) {
return direction == SortDirection.ASC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.util.Locale;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -58,9 +59,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

Expression firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Bucket sort's first argument must be a integer literal");
int numBuckets = (Integer) ((Literal<?>) firstArg).value();
int numBuckets = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));
caican00 marked this conversation as resolved.
Show resolved Hide resolved
caican00 marked this conversation as resolved.
Show resolved Hide resolved

Expression secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand All @@ -77,9 +79,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Truncate sort's first argument must be a integer literal");
int width = (Integer) ((Literal<?>) firstArg).value();
int width = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));

secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
Expand Down Expand Up @@ -113,7 +115,17 @@ private static CatalogEntity createDefaultCatalogEntity() {
private SortOrder[] createSortOrder() {
return new SortOrder[] {
SortOrders.of(
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST)
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"bucket", Literals.integerLiteral(10), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"truncate", Literals.integerLiteral(1), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};
}

Expand Down Expand Up @@ -194,6 +206,8 @@ public void testCreateIcebergTable() {
Assertions.assertEquals(sortOrders.length, loadedTable.sortOrder().length);
for (int i = 0; i < loadedTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), loadedTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), loadedTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), loadedTable.sortOrder()[i].expression());
}
Expand Down Expand Up @@ -432,7 +446,17 @@ public void testAlterIcebergTable() {
Assertions.assertFalse(alteredTable.properties().containsKey("key1"));
Assertions.assertEquals("val2_new", alteredTable.properties().get("key2"));

sortOrders[0] =
SortOrders.of(
NamedReference.field("col_2_new"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST);
Assertions.assertEquals(sortOrders.length, alteredTable.sortOrder().length);
for (int i = 0; i < alteredTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), alteredTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), alteredTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), alteredTable.sortOrder()[i].expression());
}

Column[] expected =
new Column[] {
Expand All @@ -459,13 +483,13 @@ public void testAlterIcebergTable() {
.asTableCatalog()
.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"),
TableChange.deleteColumn(new String[] {"col_1"}, false));
TableChange.deleteColumn(new String[] {"col_3"}, false));
caican00 marked this conversation as resolved.
Show resolved Hide resolved
Table alteredTable1 =
icebergCatalog
.asTableCatalog()
.loadTable(NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"));
expected =
Arrays.stream(expected).filter(c -> !"col_1".equals(c.name())).toArray(Column[]::new);
Arrays.stream(expected).filter(c -> !"col_3".equals(c.name())).toArray(Column[]::new);
Assertions.assertArrayEquals(expected, alteredTable1.columns());

Assertions.assertNotNull(alteredTable.partitioning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
Expand Down Expand Up @@ -105,6 +106,13 @@ protected static SortOrder createFunctionSortOrder(String name, String colName)
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static SortOrder createFunctionSortOrder(String name, int width, String colName) {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
return SortOrders.of(
FunctionExpression.of(name, Literals.integerLiteral(width), field(colName)),
RandomUtils.nextBoolean() ? SortDirection.DESCENDING : SortDirection.ASCENDING,
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static Types.NestedField createNestedField(
int id, String name, org.apache.iceberg.types.Type type) {
return Types.NestedField.optional(id, name, type, TEST_COMMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -72,9 +74,13 @@ public void testFromSortOrder() {
return ((NamedReference.FieldReference) sortOrder.expression())
.fieldName()[0];
} else if (sortOrder.expression() instanceof FunctionExpression) {
return ((NamedReference.FieldReference)
((FunctionExpression) sortOrder.expression()).arguments()[0])
.fieldName()[0];
Expression[] arguments =
((FunctionExpression) sortOrder.expression()).arguments();
if (arguments.length == 1) {
return ((NamedReference.FieldReference) arguments[0]).fieldName()[0];
} else {
return ((NamedReference.FieldReference) arguments[1]).fieldName()[0];
}
}
throw new RuntimeException("Unsupported sort expression type");
},
Expand All @@ -96,6 +102,45 @@ public void testFromSortOrder() {
? NullOrdering.NULLS_FIRST
: NullOrdering.NULLS_LAST,
sortOrder.nullOrdering());
String icebergNullOrder =
sortField.nullOrder() == NullOrder.NULLS_FIRST ? "nulls_first" : "nulls_last";
String icebergSortOrderString =
convertIcebergTransform(sortField)
+ idToName.get(sortField.sourceId())
+ ") "
+ sortField.direction().toString().toLowerCase(Locale.ROOT)
+ " "
+ icebergNullOrder;
String gravitinoSortOrderString =
sortOrder.expression() instanceof NamedReference.FieldReference
? "identity("
+ sortOrder.expression().toString()
+ ") "
+ sortOrder.direction()
+ " "
+ sortOrder.nullOrdering()
: sortOrder.expression().toString()
+ " "
+ sortOrder.direction()
+ " "
+ sortOrder.nullOrdering();
Assertions.assertEquals(icebergSortOrderString, gravitinoSortOrderString);
}
}

private static String convertIcebergTransform(SortField sortField) {
String transform = sortField.transform().toString();
if (transform.startsWith("year")
|| transform.startsWith("month")
|| transform.startsWith("day")
|| transform.startsWith("hour")) {
return transform + "(";
} else if (transform.startsWith("truncate") || transform.startsWith("bucket")) {
return transform.replace("[", "(").replace("]", ", ");
} else if (transform.startsWith("identity")) {
return "identity(";
} else {
throw new RuntimeException("Unsupported Iceberg transform type");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public void testToSortOrder() {
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("hour", "col_7"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("month", "col_8"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("year", "col_9"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("bucket", 10, "col_10"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("truncate", 2, "col_11"));

Types.NestedField[] nestedFields =
createNestedField("col_1", "col_2", "col_3", "col_4", "col_5");
Expand All @@ -40,6 +42,10 @@ public void testToSortOrder() {
ArrayUtils.add(nestedFields, createNestedField(8, "col_8", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(9, "col_9", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(10, "col_10", Types.IntegerType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(11, "col_11", Types.StringType.get()));
Schema schema = new Schema(nestedFields);
org.apache.iceberg.SortOrder icebergSortOrder =
ToIcebergSortOrder.toSortOrder(schema, sortOrders);
Expand All @@ -61,7 +67,9 @@ public void testToSortOrder() {
if (colName.equals("col_6")
|| colName.equals("col_7")
|| colName.equals("col_8")
|| colName.equals("col_9")) {
|| colName.equals("col_9")
caican00 marked this conversation as resolved.
Show resolved Hide resolved
|| colName.equals("col_10")
|| colName.equals("col_11")) {
Assertions.assertFalse(sortField.transform().isIdentity());
} else {
Assertions.assertTrue(sortField.transform().isIdentity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
Expand Down Expand Up @@ -1157,6 +1158,80 @@ private static void checkIcebergTableFileFormat(
Assertions.assertEquals(expectedFileFormat, loadTable.properties().get(DEFAULT_FILE_FORMAT));
}

@Test
public void testTableSortOrder() {
Column[] columns = createColumns();

NameIdentifier tableIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
Distribution distribution = Distributions.NONE;

final SortOrder[] sortOrders =
new SortOrder[] {
SortOrders.of(
NamedReference.field(ICEBERG_COL_NAME2),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"bucket", Literals.integerLiteral(10), NamedReference.field(ICEBERG_COL_NAME1)),
SortDirection.ASCENDING,
NullOrdering.NULLS_LAST),
SortOrders.of(
FunctionExpression.of(
"truncate", Literals.integerLiteral(2), NamedReference.field(ICEBERG_COL_NAME3)),
SortDirection.ASCENDING,
NullOrdering.NULLS_LAST),
};
final String[] sortOrderString =
caican00 marked this conversation as resolved.
Show resolved Hide resolved
new String[] {
"iceberg_col_name2 desc nulls_first",
"bucket(10, iceberg_col_name1) asc nulls_last",
"truncate(2, iceberg_col_name3) asc nulls_last"
};

Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())};

Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
// Create a data table for Distributions.NONE
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);

Table loadTable = tableCatalog.loadTable(tableIdentifier);

// check table
assertionsTableInfo(
tableName,
table_comment,
Arrays.asList(columns),
properties,
distribution,
sortOrders,
partitioning,
loadTable);

SortOrder[] loadedSortOrders = loadTable.sortOrder();
Assertions.assertEquals(sortOrders.length, loadedSortOrders.length);
for (int i = 0; i < sortOrders.length; i++) {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
Assertions.assertEquals(sortOrders[i].direction(), loadedSortOrders[i].direction());
Assertions.assertEquals(sortOrders[i].nullOrdering(), loadedSortOrders[i].nullOrdering());
Assertions.assertEquals(
sortOrderString[i],
String.format(
"%s %s %s",
sortOrders[i].expression(), sortOrders[i].direction(), sortOrders[i].nullOrdering()));
}

Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier));
}

protected static void assertionsTableInfo(
String tableName,
String tableComment,
Expand Down
Loading