Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and truncate functions in Gravitino SortOrder #2928

Merged
merged 28 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
72502e0
[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and…
caican00 Apr 13, 2024
4937572
Merge branch 'main' into iceberg-properties
caican00 Apr 13, 2024
3e52bc0
[#2921] fix(catalog-lakehouse-iceberg): Add width param to bucket and…
caican00 Apr 13, 2024
b6d55cc
Merge remote-tracking branch 'upstream/main' into iceberg-properties
caican00 Apr 13, 2024
6277724
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 13, 2024
65cd4ff
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
6e8af10
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
b9bc926
update
caican00 Apr 15, 2024
2d021b9
update
caican00 Apr 15, 2024
855eaf2
Merge branch 'main' into iceberg-properties
caican00 Apr 15, 2024
fc09aef
update
caican00 Apr 15, 2024
99a339a
update
caican00 Apr 15, 2024
69c7321
update
caican00 Apr 15, 2024
d51903a
update
caican00 Apr 15, 2024
3b7d2ea
Merge remote-tracking branch 'origin/main' into iceberg-properties
caican00 Apr 15, 2024
b16d4ee
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 15, 2024
17990ac
Merge remote-tracking branch 'upstream_dev/iceberg-create-properties'…
caican00 Apr 15, 2024
e2c2a02
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 15, 2024
845fe0e
update
caican00 Apr 15, 2024
040870d
Merge branch 'main' into iceberg-properties
caican00 Apr 18, 2024
6b7c28a
Merge branch 'main' into iceberg-properties
caican00 Apr 18, 2024
60d11b2
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 19, 2024
0538bda
update
caican00 Apr 19, 2024
633341c
Merge branch 'iceberg-properties' of github.com:caican00/gravitino in…
caican00 Apr 19, 2024
f7f644d
Merge branch 'main' into iceberg-properties
caican00 Apr 19, 2024
c94eb48
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 May 17, 2024
98f85ce
update
caican00 May 22, 2024
28dfe4b
Merge remote-tracking branch 'origin/main' into iceberg-properties
caican00 May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
Expand Down Expand Up @@ -38,13 +39,13 @@ public SortOrder field(String sourceName, int id, SortDirection direction, NullO
@Override
public SortOrder bucket(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("bucket", id, direction, nullOrder);
return functionSortOrder("bucket", width, id, direction, nullOrder);
}

@Override
public SortOrder truncate(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return functionSortOrder("truncate", id, direction, nullOrder);
return functionSortOrder("truncate", width, id, direction, nullOrder);
}

@Override
Expand Down Expand Up @@ -80,6 +81,15 @@ private SortOrder functionSortOrder(
toGravitino(nullOrder));
}

private SortOrder functionSortOrder(
String name, int width, int id, SortDirection direction, NullOrder nullOrder) {
return SortOrders.of(
FunctionExpression.of(
name, Literals.integerLiteral(width), NamedReference.field(idToName.get(id))),
toGravitino(direction),
toGravitino(nullOrder));
}

private com.datastrato.gravitino.rel.expressions.sorts.SortDirection toGravitino(
SortDirection direction) {
return direction == SortDirection.ASC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.util.Locale;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -58,9 +59,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

Expression firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Bucket sort's first argument must be a integer literal");
int numBuckets = (Integer) ((Literal<?>) firstArg).value();
int numBuckets = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));
caican00 marked this conversation as resolved.
Show resolved Hide resolved
caican00 marked this conversation as resolved.
Show resolved Hide resolved

Expression secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand All @@ -77,9 +79,10 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[

firstArg = sortFunc.arguments()[0];
Preconditions.checkArgument(
firstArg instanceof Literal && ((Literal<?>) firstArg).value() instanceof Integer,
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType() instanceof Types.IntegerType,
"Truncate sort's first argument must be a integer literal");
int width = (Integer) ((Literal<?>) firstArg).value();
int width = Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value()));

secondArg = sortFunc.arguments()[1];
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
Expand Down Expand Up @@ -112,7 +114,17 @@ private static CatalogEntity createDefaultCatalogEntity() {
private SortOrder[] createSortOrder() {
return new SortOrder[] {
SortOrders.of(
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST)
NamedReference.field("col_2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"bucket", Literals.integerLiteral(10), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST),
SortOrders.of(
FunctionExpression.of(
"truncate", Literals.integerLiteral(1), NamedReference.field("col_1")),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};
}

Expand Down Expand Up @@ -190,6 +202,8 @@ public void testCreateIcebergTable() {
Assertions.assertEquals(sortOrders.length, loadedTable.sortOrder().length);
for (int i = 0; i < loadedTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), loadedTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), loadedTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), loadedTable.sortOrder()[i].expression());
}
Expand Down Expand Up @@ -416,7 +430,17 @@ public void testAlterIcebergTable() {
Assertions.assertFalse(alteredTable.properties().containsKey("key1"));
Assertions.assertEquals("val2_new", alteredTable.properties().get("key2"));

sortOrders[0] =
SortOrders.of(
NamedReference.field("col_2_new"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST);
Assertions.assertEquals(sortOrders.length, alteredTable.sortOrder().length);
for (int i = 0; i < alteredTable.sortOrder().length; i++) {
Assertions.assertEquals(sortOrders[i].direction(), alteredTable.sortOrder()[i].direction());
Assertions.assertEquals(
(sortOrders[i]).nullOrdering(), alteredTable.sortOrder()[i].nullOrdering());
Assertions.assertEquals(
(sortOrders[i]).expression(), alteredTable.sortOrder()[i].expression());
}

Column[] expected =
new Column[] {
Expand All @@ -441,12 +465,12 @@ public void testAlterIcebergTable() {
// test delete column change
icebergCatalogOperations.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"),
TableChange.deleteColumn(new String[] {"col_1"}, false));
TableChange.deleteColumn(new String[] {"col_3"}, false));
Table alteredTable1 =
icebergCatalogOperations.loadTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_iceberg_table_new"));
expected =
Arrays.stream(expected).filter(c -> !"col_1".equals(c.name())).toArray(Column[]::new);
Arrays.stream(expected).filter(c -> !"col_3".equals(c.name())).toArray(Column[]::new);
Assertions.assertArrayEquals(expected, alteredTable1.columns());

Assertions.assertNotNull(alteredTable.partitioning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.literals.Literal;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.types.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortField;
import org.apache.iceberg.types.Types;

/** Provide some basic usage methods and test classes for basic fields. */
Expand Down Expand Up @@ -98,13 +105,20 @@ protected static SortOrder[] createSortOrder(String... colNames) {
return results.toArray(new SortOrder[0]);
}

protected static SortOrder createFunctionSortOrder(String name, String colName) {
protected static SortOrder createSortOrder(String name, String colName) {
return SortOrders.of(
FunctionExpression.of(name, field(colName)),
RandomUtils.nextBoolean() ? SortDirection.DESCENDING : SortDirection.ASCENDING,
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static SortOrder createSortOrder(String name, int width, String colName) {
return SortOrders.of(
FunctionExpression.of(name, Literals.integerLiteral(width), field(colName)),
RandomUtils.nextBoolean() ? SortDirection.DESCENDING : SortDirection.ASCENDING,
RandomUtils.nextBoolean() ? NullOrdering.NULLS_FIRST : NullOrdering.NULLS_LAST);
}

protected static Types.NestedField createNestedField(
int id, String name, org.apache.iceberg.types.Type type) {
return Types.NestedField.optional(id, name, type, TEST_COMMENT);
Expand All @@ -120,6 +134,66 @@ protected static Types.NestedField[] createNestedField(String... colNames) {
return results.toArray(new Types.NestedField[0]);
}

// Iceberg supports function expressions as SortOrder expressions, the function expression is used
// to evaluate the input value and return a result.
// And in Iceberg, these function expressions are represented by
// `org.apache.iceberg.transforms.Transform`, such as a Bucket(10, column) Transform.
protected static String getIcebergTransfromString(SortField sortField, Schema schema) {
String transform = sortField.transform().toString();
Map<Integer, String> idToName = schema.idToName();
if (transform.startsWith("year")
|| transform.startsWith("month")
|| transform.startsWith("day")
|| transform.startsWith("hour")
|| transform.startsWith("identity")) {
return String.format("%s(%s)", transform, idToName.get(sortField.sourceId()));
} else if (transform.startsWith("truncate") || transform.startsWith("bucket")) {
return String.format(
"%s, %s)",
transform.replace("[", "(").replace("]", ""), idToName.get(sortField.sourceId()));
} else {
throw new RuntimeException("Unsupported Iceberg transform type");
}
}

protected static String getGravitinoSortOrderExpressionString(Expression sortOrderExpression) {
if (sortOrderExpression instanceof NamedReference.FieldReference) {
NamedReference.FieldReference fieldReference =
(NamedReference.FieldReference) sortOrderExpression;
return String.format("identity(%s)", fieldReference.fieldName()[0]);
} else if (sortOrderExpression instanceof FunctionExpression) {
FunctionExpression functionExpression = (FunctionExpression) sortOrderExpression;
String functionName = functionExpression.functionName();
Expression[] arguments = functionExpression.arguments();
if (arguments.length == 1) {
return String.format(
"%s(%s)", functionName, ((NamedReference.FieldReference) arguments[0]).fieldName()[0]);
} else if (arguments.length == 2) {
Expression firstArg = arguments[0];
Preconditions.checkArgument(
firstArg instanceof Literal
&& ((Literal<?>) firstArg).dataType()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType,
"The first argument must be a integer literal");
return String.format(
"%s(%s, %s)",
functionName,
Integer.parseInt(String.valueOf(((Literal<?>) firstArg).value())),
((NamedReference.FieldReference) arguments[1]).fieldName()[0]);
} else {
throw new IllegalArgumentException(
String.format(
"Iceberg FunctionExpression in Gravitino should have 1 or 2 arguments, but got %d arguments",
arguments.length));
}
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported Gravitino expression type: %s",
sortOrderExpression.getClass().getName()));
}
}

private static Type getRandomGravitinoType() {
Collection<Type> values = GRAVITINO_TYPE.values();
return values.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.FunctionExpression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
Expand Down Expand Up @@ -72,9 +73,13 @@ public void testFromSortOrder() {
return ((NamedReference.FieldReference) sortOrder.expression())
.fieldName()[0];
} else if (sortOrder.expression() instanceof FunctionExpression) {
return ((NamedReference.FieldReference)
((FunctionExpression) sortOrder.expression()).arguments()[0])
.fieldName()[0];
Expression[] arguments =
((FunctionExpression) sortOrder.expression()).arguments();
if (arguments.length == 1) {
return ((NamedReference.FieldReference) arguments[0]).fieldName()[0];
} else {
return ((NamedReference.FieldReference) arguments[1]).fieldName()[0];
}
}
throw new RuntimeException("Unsupported sort expression type");
},
Expand All @@ -96,6 +101,10 @@ public void testFromSortOrder() {
? NullOrdering.NULLS_FIRST
: NullOrdering.NULLS_LAST,
sortOrder.nullOrdering());
String icebergSortOrderString = getIcebergTransfromString(sortField, schema);
String gravitinoSortOrderString =
getGravitinoSortOrderExpressionString(sortOrder.expression());
Assertions.assertEquals(icebergSortOrderString, gravitinoSortOrderString);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ public class TestToIcebergSortOrder extends TestBaseConvert {
@Test
public void testToSortOrder() {
SortOrder[] sortOrders = createSortOrder("col_1", "col_2", "col_3", "col_4", "col_5");
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("day", "col_6"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("hour", "col_7"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("month", "col_8"));
sortOrders = ArrayUtils.add(sortOrders, createFunctionSortOrder("year", "col_9"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("day", "col_6"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("hour", "col_7"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("month", "col_8"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("year", "col_9"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("bucket", 10, "col_10"));
sortOrders = ArrayUtils.add(sortOrders, createSortOrder("truncate", 2, "col_11"));

Types.NestedField[] nestedFields =
createNestedField("col_1", "col_2", "col_3", "col_4", "col_5");
Expand All @@ -40,6 +42,10 @@ public void testToSortOrder() {
ArrayUtils.add(nestedFields, createNestedField(8, "col_8", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(9, "col_9", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(10, "col_10", Types.IntegerType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(11, "col_11", Types.StringType.get()));
Schema schema = new Schema(nestedFields);
org.apache.iceberg.SortOrder icebergSortOrder =
ToIcebergSortOrder.toSortOrder(schema, sortOrders);
Expand All @@ -58,14 +64,6 @@ public void testToSortOrder() {
String colName = idToName.get(sortField.sourceId());
Assertions.assertTrue(sortOrderByName.containsKey(colName));
SortOrder sortOrder = sortOrderByName.get(colName);
if (colName.equals("col_6")
|| colName.equals("col_7")
|| colName.equals("col_8")
|| colName.equals("col_9")) {
Assertions.assertFalse(sortField.transform().isIdentity());
} else {
Assertions.assertTrue(sortField.transform().isIdentity());
}
Assertions.assertEquals(
sortOrder.direction() == SortDirection.ASCENDING
? org.apache.iceberg.SortDirection.ASC
Expand All @@ -76,6 +74,10 @@ public void testToSortOrder() {
? NullOrder.NULLS_FIRST
: NullOrder.NULLS_LAST,
sortField.nullOrder());
String icebergSortOrderString = getIcebergTransfromString(sortField, schema);
String gravitinoSortOrderString =
getGravitinoSortOrderExpressionString(sortOrder.expression());
Assertions.assertEquals(icebergSortOrderString, gravitinoSortOrderString);
}
}
}
Loading
Loading