Skip to content

Commit

Permalink
introduce ExtractedField class
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Trochimiak <kacper.trochimiak@eliatra.com>
  • Loading branch information
kt-eliatra committed Aug 27, 2024
1 parent 96e10ca commit 9653201
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,30 +486,32 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
|""".stripMargin)
}

protected def createStructNestedTable2(testTable: String): Unit = {
protected def createStructNestedTableWithKeysLikeNestedFields(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| unmapped STRUCT<userIdentity: STRUCT<sessioncontext: STRUCT<sessionIssuer: STRUCT<type: STRING>>>>
| )
| USING JSON
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES
| ( STRUCT(STRUCT(STRUCT(STRUCT(STRUCT("example_type1"))))) )
|""".stripMargin)
}
| CREATE TABLE $testTable
| (
| user_data
| STRUCT<
| `user.first.name`:STRING,
| `user.last.name`:STRING,
| `user.age`:INT,
| `user.home.address.street`:STRING,
| `user.home.address.city`:STRING
| >,
| user_credentials STRUCT<login:STRING, password: STRING>
| )
| USING JSON
|""".stripMargin)

protected def createStructNestedTable3(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| USING JSON
| OPTIONS (
| path "../integ-test/src/integration/scala/org/opensearch/flint/spark/unmapped.json"
| )
|""".stripMargin)
| INSERT INTO $testTable
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ( STRUCT("Alice", "Smith", 30, "123 Main St", "Seattle"), STRUCT("asmith", "REDACTED") ),
| ( STRUCT("Bob", "Johnson", 55, "456 Elm St", "Seattle"), STRUCT("bjohnson", "REDACTED") ),
| ( STRUCT("Charlie", "Williams", 65, "789 Pine St", "San Francisco"), STRUCT("cwilliams", "REDACTED") ),
| ( STRUCT("David", "Brown", 19, "101 Maple St", "San Francisco"), STRUCT("dbrown", "REDACTED") )
|""".stripMargin)
}

protected def createTableIssue112(testTable: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLNestedFieldsITSuite
Expand All @@ -20,12 +18,14 @@ class FlintSparkPPLNestedFieldsITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val nestedTestTable = "spark_catalog.default.flint_ppl_test_nested"
private val nestedTestTableWithNestedKeys = "spark_catalog.default.flint_ppl_test_nested_with_nested_keys"

override def beforeAll(): Unit = {
super.beforeAll()

createStructNestedTable3(testTable)
createStructNestedTable(nestedTestTable)
createStructNestedTableWithKeysLikeNestedFields(nestedTestTableWithNestedKeys)
}

protected override def afterEach(): Unit = {
Expand All @@ -37,33 +37,9 @@ class FlintSparkPPLNestedFieldsITSuite
}
}

test("aaa") {
val pplFrame = sql(s"""
| source = $testTable | fields unmapped.userIdentity.sessioncontext.sessionIssuer.type
| """.stripMargin)

// Retrieve the results
val pplResults: Array[Row] = pplFrame.collect()
assert(pplResults.length == 1)
val expectedResults: Array[Row] = Array(Row("Role"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(pplResults.sorted.sameElements(expectedResults.sorted))

val sqlFrame = sql(s"""
| select unmapped['userIdentity.sessioncontext.sessionIssuer.type'] from $testTable
| """.stripMargin)

// Retrieve the results
val sqlResults: Array[Row] = sqlFrame.collect()
assert(sqlResults.length == 1)
assert(sqlResults.sorted.sameElements(expectedResults.sorted))
}


test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test_nested`"
Seq(nestedTestTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
| source = $table
| """.stripMargin)
Expand All @@ -87,15 +63,15 @@ class FlintSparkPPLNestedFieldsITSuite
val expectedPlan: LogicalPlan =
Project(
Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}
}

test("create ppl simple query with head (limit) 1 test") {
val frame = sql(s"""
| source = $testTable| head 1
| source = $nestedTestTable| head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -106,7 +82,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val limitPlan: LogicalPlan =
Limit(Literal(1), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
Limit(Literal(1), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan)

// Compare the two plans
Expand All @@ -115,7 +91,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with head (limit) and sorted test") {
val frame = sql(s"""
| source = $testTable| sort int_col | head 1
| source = $nestedTestTable| sort int_col | head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -129,7 +105,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -141,7 +117,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with head (limit) and nested column sorted test") {
val frame = sql(s"""
| source = $testTable| sort struct_col.field1 | head 1
| source = $nestedTestTable| sort struct_col.field1 | head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -155,7 +131,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col.field1"), Ascending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -167,7 +143,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query two with fields result test") {
val frame = sql(s"""
| source = $testTable| fields int_col, struct_col.field2, struct_col2.field2
| source = $nestedTestTable| fields int_col, struct_col.field2, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -192,14 +168,14 @@ class FlintSparkPPLNestedFieldsITSuite
UnresolvedAttribute("int_col"),
UnresolvedAttribute("struct_col.field2"),
UnresolvedAttribute("struct_col2.field2")),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}

test("create ppl simple sorted query two with fields result test sorted") {
val frame = sql(s"""
| source = $testTable| sort - struct_col2.field2 | fields int_col, struct_col2.field2
| source = $nestedTestTable| sort - struct_col2.field2 | fields int_col, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -216,7 +192,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -230,7 +206,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple sorted by nested field query with two with fields result test ") {
val frame = sql(s"""
| source = $testTable| sort - struct_col.field2 , - int_col | fields int_col, struct_col.field2
| source = $nestedTestTable| sort - struct_col.field2 , - int_col | fields int_col, struct_col.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -249,7 +225,7 @@ class FlintSparkPPLNestedFieldsITSuite
SortOrder(UnresolvedAttribute("struct_col.field2"), Descending),
SortOrder(UnresolvedAttribute("int_col"), Descending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -263,7 +239,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field 1 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2
| source = $nestedTestTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -277,7 +253,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), table)
Expand All @@ -297,7 +273,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field 2 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2
| source = $nestedTestTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -311,7 +287,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)), table)
Expand All @@ -331,7 +307,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field string match test") {
val frame = sql(s"""
| source = $testTable| where struct_col.field1.subfield = 'value1' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| source = $nestedTestTable| where struct_col.field1.subfield = 'value1' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| """.stripMargin)

// Retrieve the results
Expand All @@ -345,7 +321,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")), table)
Expand All @@ -365,7 +341,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field string filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| source = $nestedTestTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| """.stripMargin)

// Retrieve the results
Expand All @@ -383,7 +359,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan = Filter(
GreaterThan(UnresolvedAttribute("struct_col2.field1.subfield"), Literal("valueA")),
Expand All @@ -401,4 +377,60 @@ class FlintSparkPPLNestedFieldsITSuite
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("create ppl simple query nested column with dots in name test") {
val frame = sql(s"""
| source = $nestedTestTableWithNestedKeys | fields user_data[user.first.name]
| """.stripMargin)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row("Alice"), Row("Bob"), Row("Charlie"), Row("David"))
assert(results.length == 4)
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val pplLogicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested_with_nested_keys"))
val expectedPlan = Project(Seq(
UnresolvedAlias(UnresolvedExtractValue(UnresolvedAttribute("user_data"), Literal("user.first.name"))),
), table)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(pplLogicalPlan))
}

test("create ppl simple query nested column with dots in name filter test") {
val frame = sql(s"""
| source = $nestedTestTableWithNestedKeys | where user_data[user.home.address.city] = 'Seattle'
| """.stripMargin)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(Row("Alice", "Smith", 30, "123 Main St", "Seattle"), Row("asmith", "REDACTED")),
Row(Row("Bob", "Johnson", 55, "456 Elm St", "Seattle"), Row("bjohnson", "REDACTED"))
)
assert(results.length == 2)
// Compare the results
assert(results === expectedResults)

// Retrieve the logical plan
val pplLogicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested_with_nested_keys"))
val cityEqualTo = EqualTo(UnresolvedExtractValue(UnresolvedAttribute("user_data"), Literal("user.home.address.city")), Literal("Seattle"))
val filter = Filter(cityEqualTo, table)
val expectedPlan = Project(Seq(
UnresolvedStar(Option.empty),
), filter)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(pplLogicalPlan))
}
}
5 changes: 5 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ sortFieldExpression

fieldExpression
: qualifiedName
| extractedName
;

wcFieldExpression
Expand Down Expand Up @@ -784,6 +785,10 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

extractedName
: rootName = qualifiedName LT_SQR_PRTHS extractPath = qualifiedName RT_SQR_PRTHS
;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Loading

0 comments on commit 9653201

Please sign in to comment.