From d566d3f97ba2d7e67b662765450fed9ccabcf902 Mon Sep 17 00:00:00 2001 From: Lukasz Soszynski Date: Wed, 16 Oct 2024 14:44:31 +0200 Subject: [PATCH] First working version of flatten command. Signed-off-by: Lukasz Soszynski --- .../function/BuiltinFunctionName.java | 1 - .../sql/ppl/CatalystQueryPlanVisitor.java | 9 +++-- .../flint/spark/FlattenGenerator.scala | 33 +++++++++++++++++++ 3 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlattenGenerator.scala diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index d4a6c364c..6b549663a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -114,7 +114,6 @@ public enum BuiltinFunctionName { WEEK_OF_YEAR(FunctionName.of("week_of_year")), YEAR(FunctionName.of("year")), YEARWEEK(FunctionName.of("yearweek")), - PPL_FLATTEN_FUNCTION(FunctionName.of("ppl_flatten_function")), // `now`-like functions NOW(FunctionName.of("now")), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 91ab0eb11..489b14267 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -14,8 +14,8 @@ import org.apache.spark.sql.catalyst.expressions.CaseWhen; import org.apache.spark.sql.catalyst.expressions.Descending$; import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.GeneratorOuter; import org.apache.spark.sql.catalyst.expressions.InSubquery$; -import org.apache.spark.sql.catalyst.expressions.Inline; import org.apache.spark.sql.catalyst.expressions.ListQuery$; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.Predicate; @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.opensearch.flint.spark.FlattenGenerator; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; @@ -433,10 +434,8 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getFieldToBeFlattened(), context); - Inline inline = new Inline(field); - Option x = (Option) None$.MODULE$; - LogicalPlan apply = context.apply(p -> new Generate(inline, seq(), true, x, seq(),p)); - return apply; + FlattenGenerator explode = new FlattenGenerator(field); + return context.apply(p -> new Generate(new GeneratorOuter(explode), seq(), true, (Option) None$.MODULE$, seq(),p)); } private void visitFieldList(List fieldList, CatalystPlanContext context) { diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlattenGenerator.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlattenGenerator.scala new file mode 100644 index 000000000..23b545826 --- /dev/null +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlattenGenerator.scala @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.flint.spark + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.{CollectionGenerator, CreateArray, Expression, GenericInternalRow, Inline, UnaryExpression} +import org.apache.spark.sql.types.{ArrayType, StructType} + +class FlattenGenerator(override val child: Expression) + extends Inline(child) + with CollectionGenerator { + + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case st: StructType => TypeCheckResult.TypeCheckSuccess + case _ => super.checkInputDataTypes() + } + + override def elementSchema: StructType = child.dataType match { + case st: StructType => st + case _ => super.elementSchema + } + + override protected def withNewChildInternal(newChild: Expression): FlattenGenerator = { + newChild.dataType match { + case ArrayType(st: StructType, _) => new FlattenGenerator(newChild) + case st: StructType => withNewChildInternal(CreateArray(Seq(newChild), false)) + case _ => + throw new IllegalArgumentException(s"Unexpected input type ${newChild.dataType}") + } + } +}