Skip to content

Commit

Permalink
add integration test for between command to the straight and NOT usage
Browse files Browse the repository at this point in the history
Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com>
  • Loading branch information
dr-lilienthal authored and salyh committed Oct 16, 2024
1 parent 04c30b7 commit 6c13c69
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBetweenITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedStateCountryTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test between should return records between two values") {
val frame = sql(s"""
| source = $testTable | where age between 20 and 30
| """.stripMargin)

val results = frame.collect()
assert(results.length == 3)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30")
})
}

test("test between should return records NOT between two values") {
val frame = sql(s"""
| source = $testTable | where age NOT between 20 and 30
| """.stripMargin)

val results = frame.collect()
assert(results.length == 1)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30")
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._

class PPLLogicalPlanBetweenExpressionTranslatorTestSuite
extends SparkFunSuite
with PlanTest
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test between expression") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = {
planTransformer.visit(
plan(
pplParser,
"source = table | where datetime_field >= '2024-09-10' and datetime_field <= '2024-09-15'"),
context)
}
// SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15'
val star = Seq(UnresolvedStar(None))

val datetime_field = UnresolvedAttribute("datetime_field")
val tableRelation = UnresolvedRelation(Seq("table"))

val lowerBound = Literal("2024-09-10")
val upperBound = Literal("2024-09-15")
val betweenCondition = And(
GreaterThanOrEqual(datetime_field, lowerBound),
LessThanOrEqual(datetime_field, upperBound))

val filterPlan = Filter(betweenCondition, tableRelation)
val expectedPlan = Project(star, filterPlan)

comparePlans(expectedPlan, logPlan, false)
}

}

0 comments on commit 6c13c69

Please sign in to comment.