Skip to content

Commit

Permalink
First working version of flatten command.
Browse files Browse the repository at this point in the history
Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>
  • Loading branch information
lukasz-soszynski-eliatra committed Oct 16, 2024
1 parent 9e9fbd3 commit d566d3f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public enum BuiltinFunctionName {
WEEK_OF_YEAR(FunctionName.of("week_of_year")),
YEAR(FunctionName.of("year")),
YEARWEEK(FunctionName.of("yearweek")),
PPL_FLATTEN_FUNCTION(FunctionName.of("ppl_flatten_function")),

// `now`-like functions
NOW(FunctionName.of("now")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.GeneratorOuter;
import org.apache.spark.sql.catalyst.expressions.InSubquery$;
import org.apache.spark.sql.catalyst.expressions.Inline;
import org.apache.spark.sql.catalyst.expressions.ListQuery$;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
Expand All @@ -27,6 +27,7 @@
import org.apache.spark.sql.execution.command.ExplainCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.flint.spark.FlattenGenerator;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
Expand Down Expand Up @@ -433,10 +434,8 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
Expression field = visitExpression(flatten.getFieldToBeFlattened(), context);
Inline inline = new Inline(field);
Option<String> x = (Option) None$.MODULE$;
LogicalPlan apply = context.apply(p -> new Generate(inline, seq(), true, x, seq(),p));
return apply;
FlattenGenerator explode = new FlattenGenerator(field);
return context.apply(p -> new Generate(new GeneratorOuter(explode), seq(), true, (Option) None$.MODULE$, seq(),p));
}

private void visitFieldList(List<Field> fieldList, CatalystPlanContext context) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{CollectionGenerator, CreateArray, Expression, GenericInternalRow, Inline, UnaryExpression}
import org.apache.spark.sql.types.{ArrayType, StructType}

class FlattenGenerator(override val child: Expression)
extends Inline(child)
with CollectionGenerator {

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case st: StructType => TypeCheckResult.TypeCheckSuccess
case _ => super.checkInputDataTypes()
}

override def elementSchema: StructType = child.dataType match {
case st: StructType => st
case _ => super.elementSchema
}

override protected def withNewChildInternal(newChild: Expression): FlattenGenerator = {
newChild.dataType match {
case ArrayType(st: StructType, _) => new FlattenGenerator(newChild)
case st: StructType => withNewChildInternal(CreateArray(Seq(newChild), false))
case _ =>
throw new IllegalArgumentException(s"Unexpected input type ${newChild.dataType}")
}
}
}

0 comments on commit d566d3f

Please sign in to comment.