Skip to content

Commit

Permalink
[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and…
Browse files Browse the repository at this point in the history
… truncate functions in Gravitino SortOrder (#2928)

### What changes were proposed in this pull request?
Add width param to bucket and truncate functions in Gravitino SortOrder.
for example:
change `bucket([id]) `to `bucket(10, id)`
change `truncate([name]) `to `truncate(2, name)`


### Why are the changes needed? 

SortOrder in Iceberg supports `FunctionExpression`, such as `year,
month, bucket, truncate, etc`.
`truncate` and `bucket` functions both have two parameters, such as
`bucket(10, col1), truncate(2, col2)`.

However, in gravitino, when converting an iceberg sortorder with
`bucket` or `truncate` to gravitino sortOrder, there is only one
parameter in `bucket` and `truncate` functions.

This picture shows the details of the parameters of the `bucket` and
`truncate` functions in gravitino sortorder, we can test it in
`com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.TestFromIcebergSortOrder#testFromSortOrder`


![image](https://github.com/datastrato/gravitino/assets/94670132/06fd65f5-7d33-4197-8871-dda02fd70a26)

And if we want to convert the gravitino sortOrder with `bucket` or
`truncate` to iceberg sortOrder, we will get the following error as the
first param is missing.
```
java.lang.IllegalArgumentException: Bucket sort should have 2 arguments

	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
	at com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ToIcebergSortOrder.toSortOrder(ToIcebergSortOrder.java:56)
	at com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.TestFromIcebergSortOrder.testFromSortOrder(TestFromIcebergSortOrder.java:86)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
```


Fix: #2921

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UTs and ITs.
  • Loading branch information
caican00 authored and web-flow committed May 22, 2024
1 parent 13cc345 commit d7ab66b
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
Expand Down Expand Up @@ -38,13 +39,13 @@ public SortOrder field(String sourceName, int id, SortDirection direction, NullO
@Override
public SortOrder bucket(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("bucket", id, direction, nullOrder);
return functionSortOrder("bucket", width, id, direction, nullOrder);
}

@Override
public SortOrder truncate(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("truncate", id, direction, nullOrder);
return functionSortOrder("truncate", width, id, direction, nullOrder);
}

@Override
Expand Down Expand Up @@ -80,6 +81,15 @@ private SortOrder functionSortOrder(
toGravitino(nullOrder));
}

private SortOrder functionSortOrder(
String name, int width, int id, SortDirection direction, NullOrder nullOrder) {
return SortOrders.of(
FunctionExpression.of(
name, Literals.integerLiteral(width), NamedReference.field(idToName.get(id))),
toGravitino(direction),
toGravitino(nullOrder));
}

private com.datastrato.gravitino.rel.expressions.sorts.SortDirection toGravitino(
SortDirection direction) {
return direction == SortDirection.ASC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.util.Locale;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -58,9 +59,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

Expression firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Bucket sort's first argument must be a integer literal");
int numBuckets = (Integer) ((Literal<?>) firstArg).value();
int numBuckets = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));

Expression secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand All @@ -77,9 +79,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Truncate sort's first argument must be a integer literal");
int width = (Integer) ((Literal<?>) firstArg).value();
int width = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));

secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
Expand Down Expand Up @@ -113,7 +115,17 @@ private static CatalogEntity createDefaultCatalogEntity() {
private SortOrder[] createSortOrder() {
return new SortOrder[] {
SortOrders.of(
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST)
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"bucket", Literals.integerLiteral(10), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"truncate", Literals.integerLiteral(1), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};
}

Expand Down Expand Up @@ -194,6 +206,8 @@ public void testCreateIcebergTable() {
Assertions.assertEquals(sortOrders.length, loadedTable.sortOrder().length);
for (int i = 0; i < loadedTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), loadedTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), loadedTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), loadedTable.sortOrder()[i].expression());
}
Expand Down Expand Up @@ -432,7 +446,17 @@ public void testAlterIcebergTable() {
Assertions.assertFalse(alteredTable.properties().containsKey("key1"));
Assertions.assertEquals("val2_new", alteredTable.properties().get("key2"));

sortOrders[0] =
SortOrders.of(
NamedReference.field("col_2_new"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST);
Assertions.assertEquals(sortOrders.length, alteredTable.sortOrder().length);
for (int i = 0; i < alteredTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), alteredTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), alteredTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), alteredTable.sortOrder()[i].expression());
}

Column[] expected =
new Column[] {
Expand All @@ -455,17 +479,15 @@ public void testAlterIcebergTable() {
Assertions.assertArrayEquals(expected, alteredTable.columns());

// test delete column change
icebergCatalog
.asTableCatalog()
.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"),
TableChange.deleteColumn(new String[] {"col_1"}, false));
icebergCatalogOperations.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"),
TableChange.deleteColumn(new String[] {"col_3"}, false));
Table alteredTable1 =
icebergCatalog
.asTableCatalog()
.loadTable(NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"));
expected =
Arrays.stream(expected).filter(c -> !"col_1".equals(c.name())).toArray(Column[]::new);
Arrays.stream(expected).filter(c -> !"col_3".equals(c.name())).toArray(Column[]::new);
Assertions.assertArrayEquals(expected, alteredTable1.columns());

Assertions.assertNotNull(alteredTable.partitioning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.literals.Literal;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.types.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortField;
import org.apache.iceberg.types.Types;

/** Provide some basic usage methods and test classes for basic fields. */
Expand Down Expand Up @@ -98,13 +105,20 @@ protected static SortOrder[] createSortOrder(String... colNames) {
return results.toArray(new SortOrder[0]);
}

protected static SortOrder createFunctionSortOrder(String name, String colName) {
protected static SortOrder createSortOrder(String name, String colName) {
return SortOrders.of(
FunctionExpression.of(name, field(colName)),
RandomUtils.nextBoolean() ? SortDirection.DESCENDING : SortDirection.ASCENDING,
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static SortOrder createSortOrder(String name, int width, String colName) {
return SortOrders.of(
FunctionExpression.of(name, Literals.integerLiteral(width), field(colName)),
RandomUtils.nextBoolean() ? SortDirection.DESCENDING : SortDirection.ASCENDING,
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static Types.NestedField createNestedField(
int id, String name, org.apache.iceberg.types.Type type) {
return Types.NestedField.optional(id, name, type, TEST_COMMENT);
Expand All @@ -120,6 +134,66 @@ protected static Types.NestedField[] createNestedField(String... colNames) {
return results.toArray(new Types.NestedField[0]);
}

// Iceberg supports function expressions as SortOrder expressions, the function expression is used
// to evaluate the input value and return a result.
// And in Iceberg, these function expressions are represented by
// `org.apache.iceberg.transforms.Transform`, such as a Bucket(10, column) Transform.
protected static String getIcebergTransfromString(SortField sortField, Schema schema) {
String transform = sortField.transform().toString();
Map<Integer, String> idToName = schema.idToName();
if (transform.startsWith("year")
|| transform.startsWith("month")
|| transform.startsWith("day")
|| transform.startsWith("hour")
|| transform.startsWith("identity")) {
return String.format("%s(%s)", transform, idToName.get(sortField.sourceId()));
} else if (transform.startsWith("truncate") || transform.startsWith("bucket")) {
return String.format(
"%s, %s)",
transform.replace("[", "(").replace("]", ""), idToName.get(sortField.sourceId()));
} else {
throw new RuntimeException("Unsupported Iceberg transform type");
}
}

protected static String getGravitinoSortOrderExpressionString(Expression sortOrderExpression) {
if (sortOrderExpression instanceof NamedReference.FieldReference) {
NamedReference.FieldReference fieldReference =
(NamedReference.FieldReference) sortOrderExpression;
return String.format("identity(%s)", fieldReference.fieldName()[0]);
} else if (sortOrderExpression instanceof FunctionExpression) {
FunctionExpression functionExpression = (FunctionExpression) sortOrderExpression;
String functionName = functionExpression.functionName();
Expression[] arguments = functionExpression.arguments();
if (arguments.length == 1) {
return String.format(
"%s(%s)", functionName, ((NamedReference.FieldReference) arguments[0]).fieldName()[0]);
} else if (arguments.length == 2) {
Expression firstArg = arguments[0];
Preconditions.checkArgument(
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType,
"The first argument must be a integer literal");
return String.format(
"%s(%s, %s)",
functionName,
Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value())),
((NamedReference.FieldReference) arguments[1]).fieldName()[0]);
} else {
throw new IllegalArgumentException(
String.format(
"Iceberg FunctionExpression in Gravitino should have 1 or 2 arguments, but got %d arguments",
arguments.length));
}
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported Gravitino expression type: %s",
sortOrderExpression.getClass().getName()));
}
}

private static Type getRandomGravitinoType() {
Collection<Type> values = GRAVITINO_TYPE.values();
return values.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
Expand Down Expand Up @@ -72,9 +73,13 @@ public void testFromSortOrder() {
return ((NamedReference.FieldReference) sortOrder.expression())
.fieldName()[0];
} else if (sortOrder.expression() instanceof FunctionExpression) {
return ((NamedReference.FieldReference)
((FunctionExpression) sortOrder.expression()).arguments()[0])
.fieldName()[0];
Expression[] arguments =
((FunctionExpression) sortOrder.expression()).arguments();
if (arguments.length == 1) {
return ((NamedReference.FieldReference) arguments[0]).fieldName()[0];
} else {
return ((NamedReference.FieldReference) arguments[1]).fieldName()[0];
}
}
throw new RuntimeException("Unsupported sort expression type");
},
Expand All @@ -96,6 +101,10 @@ public void testFromSortOrder() {
? NullOrdering.NULLS_FIRST
: NullOrdering.NULLS_LAST,
sortOrder.nullOrdering());
String icebergSortOrderString = getIcebergTransfromString(sortField, schema);
String gravitinoSortOrderString =
getGravitinoSortOrderExpressionString(sortOrder.expression());
Assertions.assertEquals(icebergSortOrderString, gravitinoSortOrderString);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ public class TestToIcebergSortOrder extends TestBaseConvert {
@Test
public void testToSortOrder() {
SortOrder[] sortOrders = createSortOrder("col_1", "col_2", "col_3", "col_4", "col_5");
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("day", "col_6"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("hour", "col_7"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("month", "col_8"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("year", "col_9"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("day", "col_6"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("hour", "col_7"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("month", "col_8"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("year", "col_9"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("bucket", 10, "col_10"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("truncate", 2, "col_11"));

Types.NestedField[] nestedFields =
createNestedField("col_1", "col_2", "col_3", "col_4", "col_5");
Expand All @@ -40,6 +42,10 @@ public void testToSortOrder() {
ArrayUtils.add(nestedFields, createNestedField(8, "col_8", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(9, "col_9", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(10, "col_10", Types.IntegerType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(11, "col_11", Types.StringType.get()));
Schema schema = new Schema(nestedFields);
org.apache.iceberg.SortOrder icebergSortOrder =
ToIcebergSortOrder.toSortOrder(schema, sortOrders);
Expand All @@ -58,14 +64,6 @@ public void testToSortOrder() {
String colName = idToName.get(sortField.sourceId());
Assertions.assertTrue(sortOrderByName.containsKey(colName));
SortOrder sortOrder = sortOrderByName.get(colName);
if (colName.equals("col_6")
|| colName.equals("col_7")
|| colName.equals("col_8")
|| colName.equals("col_9")) {
Assertions.assertFalse(sortField.transform().isIdentity());
} else {
Assertions.assertTrue(sortField.transform().isIdentity());
}
Assertions.assertEquals(
sortOrder.direction() == SortDirection.ASCENDING
? org.apache.iceberg.SortDirection.ASC
Expand All @@ -76,6 +74,10 @@ public void testToSortOrder() {
? NullOrder.NULLS_FIRST
: NullOrder.NULLS_LAST,
sortField.nullOrder());
String icebergSortOrderString = getIcebergTransfromString(sortField, schema);
String gravitinoSortOrderString =
getGravitinoSortOrderExpressionString(sortOrder.expression());
Assertions.assertEquals(icebergSortOrderString, gravitinoSortOrderString);
}
}
}
Loading

0 comments on commit d7ab66b

Please sign in to comment.