Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
Expand Down Expand Up @@ -783,6 +784,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) {
throw getOnlyForCalciteException("Appendcol");
}

@Override
public LogicalPlan visitAppend(Append node, AnalysisContext context) {
throw getOnlyForCalciteException("Append");
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
Expand Down Expand Up @@ -416,4 +417,8 @@ public T visitExistsSubquery(ExistsSubquery node, C context) {
public T visitAppendCol(AppendCol node, C context) {
return visitChildren(node, context);
}

public T visitAppend(Append node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast;

import java.util.Collections;
import java.util.List;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;

/** AST nodes visitor simplifies subsearch ast tree with empty source input. */
public class EmptySourcePropagateVisitor extends AbstractNodeVisitor<UnresolvedPlan, Void> {

public static final UnresolvedPlan EMPTY_SOURCE = new Values(Collections.emptyList());

@Override
public UnresolvedPlan visitValues(Values node, Void context) {
return node;
}

@Override
public UnresolvedPlan visitRelation(Relation node, Void context) {
return node;
}

// Assume future table functions like inputLookup, makeresult command will use this unresolved
// plan
@Override
public UnresolvedPlan visitTableFunction(TableFunction node, Void context) {
return node;
}

@Override
public UnresolvedPlan visitChildren(Node node, Void context) {
assert node instanceof UnresolvedPlan;
UnresolvedPlan unresolvedPlan = (UnresolvedPlan) node;

if (unresolvedPlan.getChild().size() == 1) {
return isEmptySource(((List<UnresolvedPlan>) unresolvedPlan.getChild()).get(0))
? EMPTY_SOURCE
: unresolvedPlan;
}
return super.visitChildren(node, context);
}

@Override
public UnresolvedPlan visitAppend(Append node, Void context) {
UnresolvedPlan subSearch = node.getSubSearch().accept(this, context);
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
return new Append(subSearch).attach(child);
}

@Override
public UnresolvedPlan visitAppendCol(AppendCol node, Void context) {
UnresolvedPlan subSearch = node.getSubSearch().accept(this, context);
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
return new AppendCol(node.isOverride(), subSearch).attach(child);
}

// TODO: Revisit lookup logic here but for now we don't see use case yet
@Override
public UnresolvedPlan visitLookup(Lookup node, Void context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of tests/ITs cover the visitLookup and visitJoin you added here. Please add tests/ITs for empty source for lookup/join

Copy link
Contributor Author

@songkant-aws songkant-aws Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests. For now, our join or lookup command syntax enforces the searchCommand to be existent in the right child. So I can't cover the case of empty source of right child. Empty source for left child test cases are added.

UnresolvedPlan lookupRelation = node.getLookupRelation().accept(this, context);
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
// Lookup is a LEFT join.
// If left child is expected to be 0 row, it outputs 0 row
// If right child is expected to be 0 row, the output is the left child;
if (isEmptySource(child)) {
return EMPTY_SOURCE;
}
return isEmptySource(lookupRelation) ? child : node;
}

// Not see use case yet
@Override
public UnresolvedPlan visitJoin(Join node, Void context) {
UnresolvedPlan left = node.getLeft().accept(this, context);
UnresolvedPlan right = node.getRight().accept(this, context);

boolean leftEmpty = isEmptySource(left);
boolean rightEmpty = isEmptySource(right);

switch (node.getJoinType()) {
case INNER:
case CROSS:
return leftEmpty || rightEmpty ? EMPTY_SOURCE : node;
case LEFT:
case SEMI:
case ANTI:
if (leftEmpty) {
return EMPTY_SOURCE;
}
return rightEmpty ? left : node;
case RIGHT:
if (rightEmpty) {
return EMPTY_SOURCE;
}
return leftEmpty ? right : node;
case FULL:
if (leftEmpty) {
return right;
}
return rightEmpty ? left : node;
default:
return node;
}
}

private boolean isEmptySource(UnresolvedPlan plan) {
return plan instanceof Values
&& (((Values) plan).getValues() == null || ((Values) plan).getValues().isEmpty());
}
}
45 changes: 45 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Append.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/** Logical plan node of Append, the interface for union all columns in queries. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Append extends UnresolvedPlan {

private final UnresolvedPlan subSearch;

private UnresolvedPlan child;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the different between searchPlan and child?

Copy link
Contributor Author

@songkant-aws songkant-aws Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The child is the main query. However, the searchPlan is the first optional searchCommand node starting in subsearch.

This is for matching optional searchCommand in the head of subsearch for append or appendcols command.

For example, in future, this query is a valid query: search | append [ | inputlookup myexcel | fields myfield ]
In above query case, the searchCommand between square brackets is optional because inputlookup command is a command similar to TableFunction. Users have freedom to choose whether data comes from search or local input. The searchPlan is used for parsing empty searchCommand as a 0 row * 0 col LogicalValues[[]] RelNode.

Another reason is to use searchPlan to handle an edge case of appending empty subresults. Some other pipeline language has similar functionality to allow subsearch outputs empty result. For example, search | append [ ] and search | append [ | fields a, b, c ] are the same because subsearch start with 0 row * 0 col input. The syntax is legit to append empty result. It is equivalent to main query.

Calcite is a strong schema engine, parsing a RelNode like

Project(a = [$0], b = [$1], ..)
    LogicalValues[[]]

will throw exception when either building RelNode or at runtime. Because Project cannot find any 'a' column from the input.

I'm open to this discussion, we have two options here:

  1. Since we're using strong schema engine, we should just throw exception. This option is simple but may have inconsistent behavior with other pipeline language.
  2. We add logics to use empty values like LogicalValues[[]] to handle edge cases. A better way is probably to determine whether subsearch RelNode's leaf is empty values. But still it has the problem of building RelNode successfully.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactor the code a bit to avoid confusing name of searchPlan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose to throw exception for empty subsearch input for now because it needs some dirty work to preprocess ast tree to achieve a not useful use case. Not supporting it by throwing exception is elegant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose to throw exception for empty subsearch input for now because it needs some dirty work to preprocess ast tree to achieve a not useful use case. Not supporting it by throwing exception is elegant.

@songkant-aws I think the the previous implementation looks good to me except the confusing name of searchPlan, maybe we need it back with another name such as emptySource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed back the empty source support, and for nested case.


@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
return visitor.visitAppend(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.util.Holder;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.EmptySourcePropagateVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.AggregateFunction;
Expand All @@ -81,6 +83,7 @@
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
Expand Down Expand Up @@ -113,6 +116,7 @@
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.Trendline.TrendlineType;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.BinUtils;
Expand Down Expand Up @@ -1214,6 +1218,75 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
}
}

@Override
public RelNode visitAppend(Append node, CalcitePlanContext context) {
// 1. Resolve main plan
visitChildren(node, context);

// 2. Resolve subsearch plan
UnresolvedPlan prunedSubSearch =
node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);

// 3. Merge two query schemas
RelNode subsearchNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
List<RelDataTypeField> mainFields = mainNode.getRowType().getFieldList();
List<RelDataTypeField> subsearchFields = subsearchNode.getRowType().getFieldList();
Map<String, RelDataTypeField> subsearchFieldMap =
subsearchFields.stream()
.map(typeField -> Pair.of(typeField.getName(), typeField))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
boolean[] isSelected = new boolean[subsearchFields.size()];
List<String> names = new ArrayList<>();
List<RexNode> mainUnionProjects = new ArrayList<>();
List<RexNode> subsearchUnionProjects = new ArrayList<>();

// 3.1 Start with main query's schema. If subsearch plan doesn't have matched column,
// add same type column in place with NULL literal
for (int i = 0; i < mainFields.size(); i++) {
mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i));
RelDataTypeField mainField = mainFields.get(i);
RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName());
names.add(mainField.getName());
if (subsearchFieldMap.containsKey(mainField.getName())
&& subsearchField != null
&& subsearchField.getType().equals(mainField.getType())) {
subsearchUnionProjects.add(
context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex()));
isSelected[subsearchField.getIndex()] = true;
} else {
subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType()));
}
}

// 3.2 Add remaining subsearch columns to the merged schema
for (int j = 0; j < subsearchFields.size(); j++) {
RelDataTypeField subsearchField = subsearchFields.get(j);
if (!isSelected[j]) {
mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType()));
subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j));
names.add(subsearchField.getName());
}
}

// 3.3 Uniquify names in case the merged names have duplicates
List<String> uniqNames =
SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true);

// 4. Apply new schema over two query plans
RelNode projectedMainNode =
context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build();
RelNode projectedSubsearchNode =
context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build();

// 5. Union all two projected plans
context.relBuilder.push(projectedMainNode);
context.relBuilder.push(projectedSubsearchNode);
context.relBuilder.union(true);
return context.relBuilder.peek();
}

/*
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
*/
Expand Down Expand Up @@ -1844,6 +1917,16 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitValues(Values values, CalcitePlanContext context) {
if (values.getValues() == null || values.getValues().isEmpty()) {
context.relBuilder.values(context.relBuilder.getTypeFactory().builder().build());
return context.relBuilder.peek();
} else {
throw new CalciteUnsupportedException("Explicit values node is unsupported in Calcite");
}
}

private void buildParseRelNode(Parse node, CalcitePlanContext context) {
RexNode sourceField = rexVisitor.analyze(node.getSourceField(), context);
ParseMethod parseMethod = node.getParseMethod();
Expand Down
2 changes: 1 addition & 1 deletion docs/category.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"user/dql/metadata.rst"
],
"ppl_cli_calcite": [
"user/ppl/cmd/stats.rst",
"user/ppl/cmd/append.rst",
"user/ppl/cmd/fields.rst",
"user/ppl/cmd/regex.rst",
"user/ppl/cmd/stats.rst",
Expand Down
Loading
Loading