Skip to content

Commit bd04828

Browse files
authored
Support fillnull command with Calcite (#3634)
* Support fillnull command with Calcite Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix explain IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * update doc Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 93fa20f commit bd04828

File tree

18 files changed

+449
-172
lines changed

18 files changed

+449
-172
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -593,20 +593,22 @@ public LogicalPlan visitFillNull(final FillNull node, final AnalysisContext cont
593593

594594
ImmutableList.Builder<Pair<ReferenceExpression, Expression>> expressionsBuilder =
595595
new Builder<>();
596-
for (FillNull.NullableFieldFill fieldFill : node.getNullableFieldFills()) {
597-
Expression fieldExpr =
598-
expressionAnalyzer.analyze(fieldFill.getNullableFieldReference(), context);
596+
for (Pair<Field, UnresolvedExpression> fieldFill : node.getReplacementPairs()) {
597+
Expression fieldExpr = expressionAnalyzer.analyze(fieldFill.getLeft(), context);
599598
ReferenceExpression ref =
600-
DSL.ref(fieldFill.getNullableFieldReference().getField().toString(), fieldExpr.type());
599+
DSL.ref(fieldFill.getLeft().getField().toString(), fieldExpr.type());
601600
FunctionExpression ifNullFunction =
602-
DSL.ifnull(ref, expressionAnalyzer.analyze(fieldFill.getReplaceNullWithMe(), context));
601+
DSL.ifnull(ref, expressionAnalyzer.analyze(fieldFill.getRight(), context));
603602
expressionsBuilder.add(new ImmutablePair<>(ref, ifNullFunction));
604603
TypeEnvironment typeEnvironment = context.peek();
605604
// define the new reference in type env.
606605
typeEnvironment.define(ref);
607606
}
608-
609-
return new LogicalEval(child, expressionsBuilder.build());
607+
List<Pair<ReferenceExpression, Expression>> expressions = expressionsBuilder.build();
608+
if (expressions.isEmpty()) {
609+
throw new SemanticCheckException("At least one field is required for fillnull in V2.");
610+
}
611+
return new LogicalEval(child, expressions);
610612
}
611613

612614
/** Build {@link LogicalML} for ml command. */

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.util.Optional;
1414
import java.util.stream.Collectors;
1515
import lombok.experimental.UtilityClass;
16-
import org.apache.commons.lang3.tuple.ImmutablePair;
1716
import org.apache.commons.lang3.tuple.Pair;
1817
import org.opensearch.sql.ast.expression.AggregateFunction;
1918
import org.opensearch.sql.ast.expression.Alias;
@@ -524,21 +523,23 @@ public static Patterns patterns(
524523
input);
525524
}
526525

527-
public static FillNull fillNull(UnresolvedExpression replaceNullWithMe, Field... fields) {
528-
return new FillNull(
529-
FillNull.ContainNullableFieldFill.ofSameValue(
530-
replaceNullWithMe, ImmutableList.copyOf(fields)));
526+
public static FillNull fillNull(UnresolvedPlan input, UnresolvedExpression replacement) {
527+
return FillNull.ofSameValue(replacement, ImmutableList.of()).attach(input);
531528
}
532529

533530
public static FillNull fillNull(
534-
List<ImmutablePair<Field, UnresolvedExpression>> fieldAndReplacements) {
535-
ImmutableList.Builder<FillNull.NullableFieldFill> replacementsBuilder = ImmutableList.builder();
536-
for (ImmutablePair<Field, UnresolvedExpression> fieldAndReplacement : fieldAndReplacements) {
531+
UnresolvedPlan input, UnresolvedExpression replacement, Field... fields) {
532+
return FillNull.ofSameValue(replacement, ImmutableList.copyOf(fields)).attach(input);
533+
}
534+
535+
public static FillNull fillNull(
536+
UnresolvedPlan input, List<Pair<Field, UnresolvedExpression>> fieldAndReplacements) {
537+
ImmutableList.Builder<Pair<Field, UnresolvedExpression>> replacementsBuilder =
538+
ImmutableList.builder();
539+
for (Pair<Field, UnresolvedExpression> fieldAndReplacement : fieldAndReplacements) {
537540
replacementsBuilder.add(
538-
new FillNull.NullableFieldFill(
539-
fieldAndReplacement.getLeft(), fieldAndReplacement.getRight()));
541+
Pair.of(fieldAndReplacement.getLeft(), fieldAndReplacement.getRight()));
540542
}
541-
return new FillNull(
542-
FillNull.ContainNullableFieldFill.ofVariousValue(replacementsBuilder.build()));
543+
return FillNull.ofVariousValue(replacementsBuilder.build()).attach(input);
543544
}
544545
}

core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,70 +6,53 @@
66
package org.opensearch.sql.ast.tree;
77

88
import java.util.List;
9-
import java.util.Objects;
10-
import lombok.AllArgsConstructor;
9+
import java.util.Optional;
10+
import lombok.EqualsAndHashCode;
1111
import lombok.Getter;
12-
import lombok.NonNull;
13-
import lombok.RequiredArgsConstructor;
12+
import lombok.ToString;
13+
import org.apache.commons.lang3.tuple.Pair;
1414
import org.opensearch.sql.ast.AbstractNodeVisitor;
1515
import org.opensearch.sql.ast.Node;
1616
import org.opensearch.sql.ast.expression.Field;
1717
import org.opensearch.sql.ast.expression.UnresolvedExpression;
1818

1919
/** AST node represent FillNull operation. */
20-
@RequiredArgsConstructor
21-
@AllArgsConstructor
20+
@Getter
21+
@EqualsAndHashCode(callSuper = false)
22+
@ToString
2223
public class FillNull extends UnresolvedPlan {
2324

24-
@Getter
25-
@RequiredArgsConstructor
26-
public static class NullableFieldFill {
27-
@NonNull private final Field nullableFieldReference;
28-
@NonNull private final UnresolvedExpression replaceNullWithMe;
25+
public static FillNull ofVariousValue(List<Pair<Field, UnresolvedExpression>> replacements) {
26+
return new FillNull(replacements);
2927
}
3028

31-
public interface ContainNullableFieldFill {
32-
List<NullableFieldFill> getNullFieldFill();
33-
34-
static ContainNullableFieldFill ofVariousValue(List<NullableFieldFill> replacements) {
35-
return new VariousValueNullFill(replacements);
36-
}
37-
38-
static ContainNullableFieldFill ofSameValue(
39-
UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
40-
return new SameValueNullFill(replaceNullWithMe, nullableFieldReferences);
29+
public static FillNull ofSameValue(UnresolvedExpression replacement, List<Field> fieldList) {
30+
List<Pair<Field, UnresolvedExpression>> replacementPairs =
31+
fieldList.stream().map(f -> Pair.of(f, replacement)).toList();
32+
FillNull instance = new FillNull(replacementPairs);
33+
if (replacementPairs.isEmpty()) {
34+
// no field specified, the replacement value will be applied to all fields.
35+
instance.replacementForAll = Optional.of(replacement);
4136
}
37+
return instance;
4238
}
4339

44-
private static class SameValueNullFill implements ContainNullableFieldFill {
45-
@Getter private final List<NullableFieldFill> nullFieldFill;
40+
private Optional<UnresolvedExpression> replacementForAll = Optional.empty();
4641

47-
public SameValueNullFill(
48-
UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
49-
Objects.requireNonNull(replaceNullWithMe, "Null replacement is required");
50-
this.nullFieldFill =
51-
Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required")
52-
.stream()
53-
.map(nullableReference -> new NullableFieldFill(nullableReference, replaceNullWithMe))
54-
.toList();
55-
}
56-
}
42+
private final List<Pair<Field, UnresolvedExpression>> replacementPairs;
5743

58-
@RequiredArgsConstructor
59-
private static class VariousValueNullFill implements ContainNullableFieldFill {
60-
@NonNull @Getter private final List<NullableFieldFill> nullFieldFill;
44+
FillNull(List<Pair<Field, UnresolvedExpression>> replacementPairs) {
45+
this.replacementPairs = replacementPairs;
6146
}
6247

6348
private UnresolvedPlan child;
6449

65-
@NonNull private final ContainNullableFieldFill containNullableFieldFill;
66-
67-
public List<NullableFieldFill> getNullableFieldFills() {
68-
return containNullableFieldFill.getNullFieldFill();
50+
public List<Field> getFields() {
51+
return getReplacementPairs().stream().map(Pair::getLeft).toList();
6952
}
7053

7154
@Override
72-
public UnresolvedPlan attach(UnresolvedPlan child) {
55+
public FillNull attach(UnresolvedPlan child) {
7356
this.child = child;
7457
return this;
7558
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.calcite.plan.RelOptTable;
2727
import org.apache.calcite.plan.ViewExpanders;
2828
import org.apache.calcite.rel.RelNode;
29+
import org.apache.calcite.rel.type.RelDataTypeField;
2930
import org.apache.calcite.rex.RexCall;
3031
import org.apache.calcite.rex.RexCorrelVariable;
3132
import org.apache.calcite.rex.RexInputRef;
@@ -37,7 +38,7 @@
3738
import org.apache.calcite.tools.RelBuilder;
3839
import org.apache.calcite.tools.RelBuilder.AggCall;
3940
import org.apache.calcite.util.Holder;
40-
import org.apache.calcite.util.Pair;
41+
import org.apache.commons.lang3.tuple.Pair;
4142
import org.checkerframework.checker.nullness.qual.Nullable;
4243
import org.opensearch.sql.ast.AbstractNodeVisitor;
4344
import org.opensearch.sql.ast.Node;
@@ -411,16 +412,16 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
411412
// \- Scan t
412413
Pair<List<AggCall>, List<RexNode>> resolved = resolveAggCallAndGroupBy(node, context);
413414
List<RexInputRef> trimmedRefs = new ArrayList<>();
414-
trimmedRefs.addAll(PlanUtils.getInputRefs(resolved.right)); // group-by keys first
415-
trimmedRefs.addAll(PlanUtils.getInputRefsFromAggCall(resolved.left));
415+
trimmedRefs.addAll(PlanUtils.getInputRefs(resolved.getRight())); // group-by keys first
416+
trimmedRefs.addAll(PlanUtils.getInputRefsFromAggCall(resolved.getLeft()));
416417
context.relBuilder.project(trimmedRefs);
417418

418419
// Re-resolve aggCalls and group-by list based on adding trimmed Project.
419420
// Using re-resolving rather than Calcite Mapping (ref Calcite ProjectTableScanRule)
420421
// because that Mapping only works for RexNode, but we need both AggCall and RexNode list.
421422
Pair<List<AggCall>, List<RexNode>> reResolved = resolveAggCallAndGroupBy(node, context);
422-
List<AggCall> aggList = reResolved.left;
423-
List<RexNode> groupByList = reResolved.right;
423+
List<AggCall> aggList = reResolved.getLeft();
424+
List<RexNode> groupByList = reResolved.getRight();
424425
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggList);
425426

426427
// schema reordering
@@ -670,6 +671,42 @@ public RelNode visitWindow(Window node, CalcitePlanContext context) {
670671
return context.relBuilder.peek();
671672
}
672673

674+
@Override
675+
public RelNode visitFillNull(FillNull node, CalcitePlanContext context) {
676+
visitChildren(node, context);
677+
if (node.getFields().size()
678+
!= new HashSet<>(node.getFields().stream().map(f -> f.getField().toString()).toList())
679+
.size()) {
680+
throw new IllegalArgumentException("The field list cannot be duplicated in fillnull");
681+
}
682+
List<RexNode> projects = new ArrayList<>();
683+
List<RelDataTypeField> fieldsList = context.relBuilder.peek().getRowType().getFieldList();
684+
for (RelDataTypeField field : fieldsList) {
685+
RexNode fieldRef = context.rexBuilder.makeInputRef(field.getType(), field.getIndex());
686+
boolean toReplace = false;
687+
for (Pair<Field, UnresolvedExpression> pair : node.getReplacementPairs()) {
688+
if (field.getName().equalsIgnoreCase(pair.getLeft().getField().toString())) {
689+
RexNode replacement = rexVisitor.analyze(pair.getRight(), context);
690+
RexNode coalesce = context.rexBuilder.coalesce(fieldRef, replacement);
691+
RexNode coalesceWithAlias = context.relBuilder.alias(coalesce, field.getName());
692+
projects.add(coalesceWithAlias);
693+
toReplace = true;
694+
break;
695+
}
696+
}
697+
if (!toReplace && node.getReplacementForAll().isEmpty()) {
698+
projects.add(fieldRef);
699+
} else if (node.getReplacementForAll().isPresent()) {
700+
RexNode replacement = rexVisitor.analyze(node.getReplacementForAll().get(), context);
701+
RexNode coalesce = context.rexBuilder.coalesce(fieldRef, replacement);
702+
RexNode coalesceWithAlias = context.relBuilder.alias(coalesce, field.getName());
703+
projects.add(coalesceWithAlias);
704+
}
705+
}
706+
context.relBuilder.project(projects);
707+
return context.relBuilder.peek();
708+
}
709+
673710
/*
674711
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
675712
*/
@@ -703,11 +740,6 @@ public RelNode visitKmeans(Kmeans node, CalcitePlanContext context) {
703740
throw new CalciteUnsupportedException("Kmeans command is unsupported in Calcite");
704741
}
705742

706-
@Override
707-
public RelNode visitFillNull(FillNull fillNull, CalcitePlanContext context) {
708-
throw new CalciteUnsupportedException("FillNull command is unsupported in Calcite");
709-
}
710-
711743
@Override
712744
public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
713745
throw new CalciteUnsupportedException("Rare and Top commands are unsupported in Calcite");

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ void populate() {
181181
registerOperator(ASIN, SqlStdOperatorTable.ASIN);
182182
registerOperator(ATAN, SqlStdOperatorTable.ATAN);
183183
registerOperator(ATAN2, SqlStdOperatorTable.ATAN2);
184+
registerOperator(CEIL, SqlStdOperatorTable.CEIL);
184185
registerOperator(CEILING, SqlStdOperatorTable.CEIL);
185186
registerOperator(COS, SqlStdOperatorTable.COS);
186187
registerOperator(COT, SqlStdOperatorTable.COT);

core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.opensearch.sql.ast.dsl.AstDSL;
7777
import org.opensearch.sql.ast.expression.Argument;
7878
import org.opensearch.sql.ast.expression.DataType;
79-
import org.opensearch.sql.ast.expression.Field;
8079
import org.opensearch.sql.ast.expression.HighlightFunction;
8180
import org.opensearch.sql.ast.expression.Literal;
8281
import org.opensearch.sql.ast.expression.ParseMethod;
@@ -86,7 +85,6 @@
8685
import org.opensearch.sql.ast.tree.AD;
8786
import org.opensearch.sql.ast.tree.CloseCursor;
8887
import org.opensearch.sql.ast.tree.FetchCursor;
89-
import org.opensearch.sql.ast.tree.FillNull;
9088
import org.opensearch.sql.ast.tree.Kmeans;
9189
import org.opensearch.sql.ast.tree.ML;
9290
import org.opensearch.sql.ast.tree.Paginate;
@@ -1397,14 +1395,11 @@ public void fillnull_same_value() {
13971395
ImmutablePair.of(
13981396
DSL.ref("int_null_value", INTEGER),
13991397
DSL.ifnull(DSL.ref("int_null_value", INTEGER), DSL.literal(0)))),
1400-
new FillNull(
1398+
AstDSL.fillNull(
14011399
AstDSL.relation("schema"),
1402-
FillNull.ContainNullableFieldFill.ofSameValue(
1403-
AstDSL.intLiteral(0),
1404-
ImmutableList.<Field>builder()
1405-
.add(AstDSL.field("integer_value"))
1406-
.add(AstDSL.field("int_null_value"))
1407-
.build())));
1400+
AstDSL.intLiteral(0),
1401+
AstDSL.field("integer_value"),
1402+
AstDSL.field("int_null_value")));
14081403
}
14091404

14101405
@Test
@@ -1418,14 +1413,11 @@ public void fillnull_various_values() {
14181413
ImmutablePair.of(
14191414
DSL.ref("int_null_value", INTEGER),
14201415
DSL.ifnull(DSL.ref("int_null_value", INTEGER), DSL.literal(1)))),
1421-
new FillNull(
1416+
AstDSL.fillNull(
14221417
AstDSL.relation("schema"),
1423-
FillNull.ContainNullableFieldFill.ofVariousValue(
1424-
ImmutableList.of(
1425-
new FillNull.NullableFieldFill(
1426-
AstDSL.field("integer_value"), AstDSL.intLiteral(0)),
1427-
new FillNull.NullableFieldFill(
1428-
AstDSL.field("int_null_value"), AstDSL.intLiteral(1))))));
1418+
List.of(
1419+
Pair.of(AstDSL.field("integer_value"), AstDSL.intLiteral(0)),
1420+
Pair.of(AstDSL.field("int_null_value"), AstDSL.intLiteral(1)))));
14291421
}
14301422

14311423
@Test

0 commit comments

Comments
 (0)