Skip to content

Commit

Permalink
add tests for ppl IT with
Browse files Browse the repository at this point in the history
 -  source = $testTable
 -  source = $testTable | fields name, age
 -  source = $testTable age=25 | fields name, age

Signed-off-by: YANGDB <yang.db.dev@gmail.com>
  • Loading branch information
YANG-DB committed Sep 7, 2023
1 parent 8bbe0d9 commit af065f7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package org.opensearch.flint.spark

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLITSuite
Expand All @@ -17,7 +17,7 @@ class FlintSparkPPLITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "default.flint_sql_test"
private val testTable = "default.flint_ppl_tst"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -55,19 +55,51 @@ class FlintSparkPPLITSuite
job.awaitTermination()
}
}

test("create ppl simple query test") {
test("create ppl simple query with start fields result test") {
val frame = sql(
s"""
| source = $testTable
| """.stripMargin)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.optimizedPlan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("default","flint_ppl_tst")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}

test("create ppl simple query two with fields result test") {
val frame = sql(
s"""
| source = $testTable | fields name, age
| """.stripMargin)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val expectedPlan: LogicalPlan = Project(Seq(UnresolvedAttribute("*")), UnresolvedRelation(TableIdentifier(testTable)))
val expectedPlan: LogicalPlan = Project(Seq(UnresolvedAttribute("name"),UnresolvedAttribute("age")),
UnresolvedRelation(Seq("default","flint_ppl_tst")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}

test("create ppl simple filter query with two fields result test") {
val frame = sql(
s"""
| source = $testTable age=25 | fields name, age
| """.stripMargin)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val table = UnresolvedRelation(Seq("default","flint_ppl_tst"))
val filterExpr = EqualTo(UnresolvedAttribute("age"), Literal(25))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("name"),UnresolvedAttribute("age"))
val expectedPlan = Project(projectList, filterPlan)
// Compare the two plans
assert(expectedPlan === logicalPlan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedTable;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
Expand Down Expand Up @@ -95,7 +97,7 @@ public String visitRelation(Relation node, CatalystPlanContext context) {
// todo - how to resolve the qualifiedName is its composed of a datasource + schema
// QualifiedName qualifiedName = node.getTableQualifiedName();
// Create an UnresolvedTable node for a table named "qualifiedName" in the default namespace
context.with(new UnresolvedTable(asScalaBuffer(of(t)).toSeq(), format("source=%s", t), empty()));
context.with(new UnresolvedRelation(asScalaBuffer(of(t.split("\\."))).toSeq(), CaseInsensitiveStringMap.empty(), false));
});
return format("source=%s", node.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.types.{DataType, StructType}
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.common.antlr.SyntaxCheckException
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}

/**
Expand All @@ -56,7 +57,7 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface
context.getPlan
} catch {
// Fall back to Spark parse plan logic if flint cannot parse
case _: ParseException => sparkParser.parsePlan(sqlText)
case _: ParseException | _: SyntaxCheckException => sparkParser.parsePlan(sqlText)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,40 @@ class PPLLogicalPlanTranslatorTestSuite
val logPlan = planTrnasformer.visit(plan(pplParser, "source=table", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, UnresolvedTable(Seq("table"), "source=table", None))
val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table")))
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[table] | fields + *")


}

test("test simple search with schema.table and no explicit fields (defaults to all fields)") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=schema.table", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table")))
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[schema.table] | fields + *")

}

test("test simple search with schema.table and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=schema.table | fields A", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A"))
val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table")))
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[schema.table] | fields + A")
}

test("test simple search with only one table with one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=table | fields A", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A"))
val expectedPlan = Project(projectList, UnresolvedTable(Seq("table"), "source=table", None))
val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table")))
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[table] | fields + A")
}
Expand All @@ -56,7 +78,7 @@ class PPLLogicalPlanTranslatorTestSuite
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a = 1 ", false), context)

val table = UnresolvedTable(Seq("t"), "source=t", None)
val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedStar(None))
Expand All @@ -65,11 +87,11 @@ class PPLLogicalPlanTranslatorTestSuite
assertEquals(logPlan, "source=[t] | where a = 1 | fields + *")
}

test("test simple search with only one table with one field literal filtered and one field projected") {
test("test simple search with only one table with one field literal equality filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a = 1 | fields a", false), context)

val table = UnresolvedTable(Seq("t"), "source=t", None)
val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
Expand All @@ -78,13 +100,78 @@ class PPLLogicalPlanTranslatorTestSuite
assertEquals(logPlan, "source=[t] | where a = 1 | fields + a")
}

test("test simple search with only one table with one field greater than filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a > 1 | fields a", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
val expectedPlan = Project(projectList, filterPlan)
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[t] | where a > 1 | fields + a")
}

test("test simple search with only one table with one field greater than equal filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a >= 1 | fields a", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
val expectedPlan = Project(projectList, filterPlan)
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[t] | where a >= 1 | fields + a")
}

test("test simple search with only one table with one field lower than filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a < 1 | fields a", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
val expectedPlan = Project(projectList, filterPlan)
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[t] | where a < 1 | fields + a")
}

test("test simple search with only one table with one field lower than equal filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a <= 1 | fields a", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
val expectedPlan = Project(projectList, filterPlan)
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[t] | where a <= 1 | fields + a")
}

test("test simple search with only one table with one field not equal filtered and one field projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t a != 1 | fields a", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(UnresolvedAttribute("a"), Literal(1))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedAttribute("a"))
val expectedPlan = Project(projectList, filterPlan)
assertEquals(context.getPlan, expectedPlan)
assertEquals(logPlan, "source=[t] | where a != 1 | fields + a")
}


test("test simple search with only one table with two fields projected") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source=t | fields A, B", false), context)


val table = UnresolvedTable(Seq("t"), "source=t", None)
val table = UnresolvedRelation(Seq("t"))
val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B"))
val expectedPlan = Project(projectList, table)
assertEquals(context.getPlan, expectedPlan)
Expand All @@ -97,16 +184,16 @@ class PPLLogicalPlanTranslatorTestSuite
val logPlan = planTrnasformer.visit(plan(pplParser, "search source = table1, table2 | fields A, B", false), context)


val table1 = UnresolvedTable(Seq("table1"), "source=table1", None)
val table2 = UnresolvedTable(Seq("table2"), "source=table2", None)
val table1 = UnresolvedRelation(Seq("table1"))
val table2 = UnresolvedRelation(Seq("table2"))

val allFields1 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B"))
val allFields2 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B"))

val projectedTable1 = Project(allFields1, table1)
val projectedTable2 = Project(allFields2, table2)

val expectedPlan = Union(Seq(projectedTable1, projectedTable2),byName = true,allowMissingCol = true)
val expectedPlan = Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true)

assertEquals(logPlan, "source=[table1, table2] | fields + A,B")
assertEquals(context.getPlan, expectedPlan)
Expand All @@ -118,16 +205,16 @@ class PPLLogicalPlanTranslatorTestSuite
val logPlan = planTrnasformer.visit(plan(pplParser, "search source = table1, table2 | ", false), context)


val table1 = UnresolvedTable(Seq("table1"), "source=table1", None)
val table2 = UnresolvedTable(Seq("table2"), "source=table2", None)
val table1 = UnresolvedRelation(Seq("table1"))
val table2 = UnresolvedRelation(Seq("table2"))

val allFields1 = UnresolvedStar(None)
val allFields2 = UnresolvedStar(None)

val projectedTable1 = Project(Seq(allFields1), table1)
val projectedTable2 = Project(Seq(allFields2), table2)

val expectedPlan = Union(Seq(projectedTable1, projectedTable2),byName = true,allowMissingCol = true)
val expectedPlan = Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true)

assertEquals(logPlan, "source=[table1, table2] | fields + *")
assertEquals(context.getPlan, expectedPlan)
Expand All @@ -137,7 +224,7 @@ class PPLLogicalPlanTranslatorTestSuite
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source = housing_properties | stats avg(price) by property_type", false), context)
// equivalent to SELECT property_type, AVG(price) FROM housing_properties GROUP BY property_type
val table = UnresolvedTable(Seq("housing_properties"), "source=housing_properties", None)
val table = UnresolvedRelation(Seq("housing_properties"))

val avgPrice = Alias(Average(UnresolvedAttribute("price")), "avg(price)")()
val propertyType = UnresolvedAttribute("property_type")
Expand All @@ -160,7 +247,7 @@ class PPLLogicalPlanTranslatorTestSuite
// Equivalent SQL: SELECT address, price, city FROM housing_properties WHERE state = 'CA' ORDER BY price DESC LIMIT 10

// Constructing the expected Catalyst Logical Plan
val table = UnresolvedTable(Seq("housing_properties"), "source=housing_properties", None)
val table = UnresolvedRelation(Seq("housing_properties"))
val filter = Filter(EqualTo(UnresolvedAttribute("state"), Literal("CA")), table)
val projectList = Seq(UnresolvedAttribute("address"), UnresolvedAttribute("price"), UnresolvedAttribute("city"))
val projected = Project(projectList, filter)
Expand All @@ -180,7 +267,7 @@ class PPLLogicalPlanTranslatorTestSuite
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(plan(pplParser, "source = housing_properties | where land_space > 0 | eval price_per_land_unit = price / land_space | stats avg(price_per_land_unit) by city", false), context)
// SQL: SELECT city, AVG(price / land_space) AS avg_price_per_land_unit FROM housing_properties WHERE land_space > 0 GROUP BY city
val table = UnresolvedTable(Seq("housing_properties"), "source=housing_properties", None)
val table = UnresolvedRelation(Seq("housing_properties"))
val filter = Filter(GreaterThan(UnresolvedAttribute("land_space"), Literal(0)), table)
val expression = AggregateExpression(
Average(Divide(UnresolvedAttribute("price"), UnresolvedAttribute("land_space"))),
Expand Down Expand Up @@ -210,7 +297,8 @@ class PPLLogicalPlanTranslatorTestSuite

val filter = Filter(LessThan(UnresolvedAttribute("listing_age"), Literal(30)),
Filter(GreaterThanOrEqual(UnresolvedAttribute("listing_age"), Literal(0)),
UnresolvedTable(Seq("housing_properties"), "source=housing_properties", None)))
UnresolvedRelation(Seq("housing_properties"))
))

val expression = AggregateExpression(
Count(Literal(1)),
Expand All @@ -237,9 +325,10 @@ class PPLLogicalPlanTranslatorTestSuite
UnresolvedAttribute("agency_name"),
UnresolvedAttribute("price")
)
val table = UnresolvedRelation(Seq("housing_properties"))

val filterCondition = Like(UnresolvedAttribute("agency_name"), Literal("%Compass%"), '\\')
val filter = Filter(filterCondition, UnresolvedTable(Seq("housing_properties"), "source=housing_properties", None))
val filter = Filter(filterCondition, table)

val sortOrder = Seq(SortOrder(UnresolvedAttribute("price"), Descending))
val sort = Sort(sortOrder, true, filter)
Expand Down Expand Up @@ -509,7 +598,7 @@ class PPLLogicalPlanTranslatorTestSuite
groupByAttributes,
aggregateExpressions ++ groupByAttributes,
Filter(
Like(UnresolvedAttribute("machine.os"), Literal("%win%"),'\\'),
Like(UnresolvedAttribute("machine.os"), Literal("%win%"), '\\'),
UnresolvedRelation(TableIdentifier("opensearch_dashboards_sample_data_logs"))
)
)
Expand All @@ -518,7 +607,8 @@ class PPLLogicalPlanTranslatorTestSuite
assertEquals(logPlan, "???")

}
// TODO - fix

// TODO - fix
test("Test Analyzer with Logical Plan") {
// Mock table schema and existence
val tableSchema = StructType(
Expand Down

0 comments on commit af065f7

Please sign in to comment.