-
Notifications
You must be signed in to change notification settings - Fork 181
Support expand command with Calcite
#3745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
112d5ce
7220ed0
c537b92
ae55697
7003abb
9cb2ccb
08b5c43
b29432d
10f81b0
4587b5d
37bfac3
776630a
5f36a75
37676ff
bc7899c
6b264d8
fe11b61
98fbb12
f722ccd
5ea6d60
4c6448d
99ad92d
42f5aa3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.expression.Field; | ||
|
|
||
| /** AST node representing an {@code expand <field>} operation. */ | ||
| @ToString | ||
| @RequiredArgsConstructor | ||
| @EqualsAndHashCode(callSuper = false) | ||
| public class Expand extends UnresolvedPlan { | ||
|
|
||
| private UnresolvedPlan child; | ||
| @Getter private final Field field; | ||
| @Getter @Nullable private final String alias; | ||
|
Comment on lines
+25
to
+26
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, removed. |
||
|
|
||
| @Override | ||
| public Expand attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public List<UnresolvedPlan> getChild() { | ||
| return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitExpand(this, context); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import org.apache.calcite.plan.ViewExpanders; | ||
| import org.apache.calcite.rel.RelNode; | ||
| import org.apache.calcite.rel.core.Aggregate; | ||
| import org.apache.calcite.rel.core.JoinRelType; | ||
| import org.apache.calcite.rel.type.RelDataTypeField; | ||
| import org.apache.calcite.rex.RexCall; | ||
| import org.apache.calcite.rex.RexCorrelVariable; | ||
|
|
@@ -67,6 +68,7 @@ | |
| import org.opensearch.sql.ast.tree.CloseCursor; | ||
| import org.opensearch.sql.ast.tree.Dedupe; | ||
| import org.opensearch.sql.ast.tree.Eval; | ||
| import org.opensearch.sql.ast.tree.Expand; | ||
| import org.opensearch.sql.ast.tree.FetchCursor; | ||
| import org.opensearch.sql.ast.tree.FillNull; | ||
| import org.opensearch.sql.ast.tree.Filter; | ||
|
|
@@ -196,7 +198,7 @@ public RelNode visitProject(Project node, CalcitePlanContext context) { | |
| } | ||
|
|
||
| /** See logic in {@link org.opensearch.sql.analysis.symbol.SymbolTable#lookupAllFields} */ | ||
| private void tryToRemoveNestedFields(CalcitePlanContext context) { | ||
| private static void tryToRemoveNestedFields(CalcitePlanContext context) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why static?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it does not depend on any instance method / field. |
||
| Set<String> allFields = new HashSet<>(context.relBuilder.peek().getRowType().getFieldNames()); | ||
| List<RexNode> duplicatedNestedFields = | ||
| allFields.stream() | ||
|
|
@@ -1100,4 +1102,72 @@ private RexNode buildWmaRexNode( | |
| return context.relBuilder.call( | ||
| SqlStdOperatorTable.DIVIDE, divider, context.relBuilder.cast(divisor, SqlTypeName.DOUBLE)); | ||
| } | ||
|
|
||
| /** | ||
| * Expand command visitor to handle array field expansion. 1. Unnest 2. Join with the original | ||
| * table to get all fields | ||
| * | ||
| * <p>S = π_{field, other_fields}(R ⨝ UNNEST_field(R)) | ||
| * | ||
| * @param expand Expand command to be visited | ||
| * @param context CalcitePlanContext containing the RelBuilder and other context | ||
| * @return RelNode representing records with the expanded array field | ||
| */ | ||
| @Override | ||
| public RelNode visitExpand(Expand expand, CalcitePlanContext context) { | ||
| // 1. Visit Children | ||
| visitChildren(expand, context); | ||
|
|
||
| RelBuilder relBuilder = context.relBuilder; | ||
|
|
||
| // 2. Get the field to expand and an optional alias. | ||
| Field arrayField = expand.getField(); | ||
| RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if ppl
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will still be able to expand the column since |
||
| String alias = expand.getAlias(); | ||
|
|
||
| // 3. Capture the outer row in a CorrelationId | ||
| Holder<RexCorrelVariable> correlVariable = Holder.empty(); | ||
| relBuilder.variable(correlVariable::set); | ||
|
|
||
| // 4. Push a copy of the original table to the RelBuilder stack as right | ||
| // side of the correlate (join). | ||
| relBuilder.push(relBuilder.peek()); | ||
| RexNode correlArrayField = | ||
| relBuilder.field( | ||
| context.rexBuilder.makeCorrel(relBuilder.peek().getRowType(), correlVariable.get().id), | ||
| arrayFieldRex.getIndex()); | ||
|
|
||
| // 5. Filter rows where the array field is the same as the left side | ||
| // TODO: This is not a standard way to use correlate and uncollect together. | ||
| // A filter should not be necessary. Correct it in the future. | ||
| RexNode filterCondition = relBuilder.equals(correlArrayField, arrayFieldRex); | ||
| relBuilder.filter(filterCondition); | ||
|
|
||
| // 6. Project only the array field for the uncollect operation | ||
| relBuilder.project(List.of(correlArrayField), List.of(arrayField.getField().toString())); | ||
|
|
||
| // 7. Expand the array field using uncollect | ||
| relBuilder.uncollect(List.of(), false); | ||
|
|
||
| // 8. Perform a nested-loop join (correlate) between the original table and the expanded | ||
| // array field. | ||
| // The last parameter has to refer to the array to be expanded on the left side. It will | ||
| // be used by the right side to correlate with the left side. | ||
| // Using left join to keep the records where the array field is empty. The corresponding | ||
| // field in the result will be null. | ||
| relBuilder.correlate(JoinRelType.LEFT, correlVariable.get().id, List.of(arrayFieldRex)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create an issue to track enforce limitation on Expand command. |
||
|
|
||
| // 9. Remove the original array field from the output. | ||
| // TODO: RFC: should we keep the original array field when alias is present? | ||
| relBuilder.projectExcept(arrayFieldRex); | ||
| if (alias != null) { | ||
| // Sub-nested fields cannot be removed after renaming the nested field. | ||
| tryToRemoveNestedFields(context); | ||
| RexInputRef expandedField = relBuilder.field(arrayField.getField().toString()); | ||
| List<String> names = new ArrayList<>(relBuilder.peek().getRowType().getFieldNames()); | ||
| names.set(expandedField.getIndex(), alias); | ||
| relBuilder.rename(names); | ||
| } | ||
| return relBuilder.peek(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| ============= | ||
| expand | ||
| ============= | ||
|
|
||
| .. rubric:: Table of contents | ||
|
|
||
| .. contents:: | ||
| :local: | ||
| :depth: 2 | ||
|
|
||
|
|
||
| Description | ||
| ============ | ||
| (From 3.1.0) | ||
|
|
||
| Use the ``expand`` command on a nested array field to transform a single | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nested array -> array? why emphasized nested?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because the boundary between array and object is blurry in OpenSearch. For example, a string field can store a string or an array of string. But there isn't an array type of string. Array only exists for nested type, where it stores an array of structs. The current implementation does not support expanding an array of string stored in a string field since it will only read the string field as string, it doesn't know whether it is an array of string or a single string at the time of generating logical plans. That's why I specifically mentioned nested array. |
||
| document into multiple documents—each containing one element from the array. | ||
| All other fields in the original document are duplicated across the resulting | ||
| documents. | ||
|
|
||
| The expand command generates one row per element in the specified array field: | ||
|
|
||
| * The specified array field is converted into individual rows. | ||
| * If an alias is provided, the expanded values appear under the alias instead | ||
| of the original field name. | ||
| * If the specified field is an empty array, the row is retained with the | ||
| expanded field set to null. | ||
|
|
||
| Version | ||
| ======= | ||
| Since 3.1.0 | ||
|
|
||
| Syntax | ||
LantaoJin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ====== | ||
|
|
||
| expand <field> [as alias] | ||
|
|
||
| * field: The field to be expanded (exploded). Currently only nested arrays are | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is not limitation of Expand command. Expand command does not aware of OpenSearch nested data type, right?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, expand command does not array of the nested type. This limitation originates from the fact that only nested fields are read as arrays in |
||
| supported. | ||
| * alias: (Optional) The name to use instead of the original field name. | ||
|
|
||
|
|
||
| Example: expand address field with an alias | ||
| =========================================== | ||
|
|
||
| Given a dataset ``migration`` with the following data: | ||
|
|
||
| .. code-block:: | ||
|
|
||
| {"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}}]} | ||
| {"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}}]} | ||
|
|
||
| The following query expand the address field and rename it to addr: | ||
|
|
||
| PPL query:: | ||
|
|
||
| PPL> source=migration | expand address as addr; | ||
| fetched rows / total rows = 3/3 | ||
| +-------+-----+-------------------------------------------------------------------------------------------+ | ||
| | name | age | addr | | ||
| |-------+-----+-------------------------------------------------------------------------------------------| | ||
| | abbas | 24 | {"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}} | | ||
| | chen | 32 | {"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}} | | ||
| | chen | 32 | {"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}} | | ||
| +-------+-----+-------------------------------------------------------------------------------------------+ | ||
|
|
||
| Limitations | ||
| ============ | ||
|
|
||
| * The ``expand`` command currently only supports nested arrays. Primitive | ||
| fields storing arrays are not supported. E.g. a string field storing an array | ||
| of strings cannot be expanded with the current implementation. | ||
| * The command works only with Calcite enabled. This can be set with the | ||
| following command: | ||
|
|
||
| .. code-block:: | ||
|
|
||
| PUT /_cluster/settings | ||
| { | ||
| "persistent":{ | ||
| "plugins.calcite.enabled": true | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashcode annotation is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed