Skip to content

Commit b06794f

Browse files
authored
feat: Add manual test to calculate spark builtin functions coverage (apache#263)
1 parent 7f22f25 commit b06794f

File tree

6 files changed

+616
-27
lines changed

6 files changed

+616
-27
lines changed

doc/spark_builtin_expr_coverage.txt

Lines changed: 419 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
+-------+--------------------------------------------------------+---+
2+
|result |details |cnt|
3+
+-------+--------------------------------------------------------+---+
4+
|FAILED |Unsupported |282|
5+
|FAILED |Failed on native side |16 |
6+
|FAILED |Failed on something else. Check query manually |4 |
7+
|PASSED |OK |101|
8+
|SKIPPED|No examples found in spark.sessionState.functionRegistry|12 |
9+
+-------+--------------------------------------------------------+---+

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,7 @@ under the License.
893893
<exclude>tpcds-kit/**</exclude>
894894
<exclude>tpcds-sf-1/**</exclude>
895895
<exclude>tpch/**</exclude>
896+
<exclude>doc/*.txt</exclude>
896897
</excludes>
897898
</configuration>
898899
</plugin>
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.nio.charset.StandardCharsets
23+
import java.nio.file.{Files, Paths}
24+
25+
import scala.collection.mutable
26+
27+
import org.scalatest.Ignore
28+
import org.scalatest.exceptions.TestFailedException
29+
30+
import org.apache.spark.sql.CometTestBase
31+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
32+
33+
/**
34+
* Manual test to calculate Spark builtin expressions coverage support by the Comet
35+
*
36+
* The test will update files doc/spark_builtin_expr_coverage.txt,
37+
* doc/spark_builtin_expr_coverage_agg.txt
38+
*/
39+
@Ignore
40+
class CometExpressionCoverageSuite extends CometTestBase with AdaptiveSparkPlanHelper {
41+
42+
import testImplicits._
43+
44+
private val rawCoverageFilePath = "doc/spark_builtin_expr_coverage.txt"
45+
private val aggCoverageFilePath = "doc/spark_builtin_expr_coverage_agg.txt"
46+
47+
test("Test Spark builtin expressions coverage") {
48+
val queryPattern = """(?i)SELECT (.+?);""".r
49+
val valuesPattern = """(?i)FROM VALUES(.+?);""".r
50+
val selectPattern = """(i?)SELECT(.+?)FROM""".r
51+
val builtinExamplesMap = spark.sessionState.functionRegistry
52+
.listFunction()
53+
.map(spark.sessionState.catalog.lookupFunctionInfo(_))
54+
.filter(_.getSource.toLowerCase == "built-in")
55+
// exclude spark streaming functions, Comet has no plans to support streaming in near future
56+
.filter(f =>
57+
!List("window", "session_window", "window_time").contains(f.getName.toLowerCase))
58+
.map(f => {
59+
val selectRows = queryPattern.findAllMatchIn(f.getExamples).map(_.group(0)).toList
60+
(f.getName, selectRows.filter(_.nonEmpty))
61+
})
62+
.toMap
63+
64+
// key - function name
65+
// value - list of result shows if function supported by Comet
66+
val resultsMap = new mutable.HashMap[String, CoverageResult]()
67+
68+
builtinExamplesMap.foreach {
69+
case (funcName, q :: _) =>
70+
val queryResult =
71+
try {
72+
// Example with predefined values
73+
// e.g. SELECT bit_xor(col) FROM VALUES (3), (5) AS tab(col)
74+
// better option is probably to parse the query and iterate through expressions
75+
// but this is adhoc coverage test
76+
if (q.toLowerCase.contains(" from values")) {
77+
val select = selectPattern.findFirstMatchIn(q).map(_.group(0))
78+
val values = valuesPattern.findFirstMatchIn(q).map(_.group(0))
79+
(select, values) match {
80+
case (Some(s), Some(v)) =>
81+
testSingleLineQuery(s"select * $v", s"$s tbl")
82+
83+
case _ =>
84+
resultsMap.put(
85+
funcName,
86+
CoverageResult("FAILED", Seq((q, "Cannot parse properly"))))
87+
}
88+
} else {
89+
// Process the simple example like `SELECT cos(0);`
90+
//
91+
// The execution disables constant folding. This optimization rule precomputes and selects the value as literal
92+
// which subsequently leads to false positives
93+
//
94+
// ConstantFolding is a operator optimization rule in Catalyst that replaces expressions
95+
// that can be statically evaluated with their equivalent literal values.
96+
testSingleLineQuery(
97+
"select 'dummy' x",
98+
s"${q.dropRight(1)}, x from tbl",
99+
excludedOptimizerRules =
100+
Some("org.apache.spark.sql.catalyst.optimizer.ConstantFolding"))
101+
}
102+
CoverageResult(CoverageResultStatus.Passed.toString, Seq((q, "OK")))
103+
} catch {
104+
case e: TestFailedException
105+
if e.message.getOrElse("").contains("Expected only Comet native operators") =>
106+
CoverageResult(CoverageResultStatus.Failed.toString, Seq((q, "Unsupported")))
107+
case e if e.getMessage.contains("CometNativeException") =>
108+
CoverageResult(
109+
CoverageResultStatus.Failed.toString,
110+
Seq((q, "Failed on native side")))
111+
case _ =>
112+
CoverageResult(
113+
CoverageResultStatus.Failed.toString,
114+
Seq((q, "Failed on something else. Check query manually")))
115+
}
116+
resultsMap.put(funcName, queryResult)
117+
118+
case (funcName, List()) =>
119+
resultsMap.put(
120+
funcName,
121+
CoverageResult(
122+
CoverageResultStatus.Skipped.toString,
123+
Seq(("", "No examples found in spark.sessionState.functionRegistry"))))
124+
}
125+
126+
// TODO: convert results into HTML
127+
resultsMap.toSeq.toDF("name", "details").createOrReplaceTempView("t")
128+
val str_agg = showString(
129+
spark.sql(
130+
"select result, d._2 as details, count(1) cnt from (select name, t.details.result, explode_outer(t.details.details) as d from t) group by 1, 2 order by 1"),
131+
1000,
132+
0)
133+
Files.write(Paths.get(aggCoverageFilePath), str_agg.getBytes(StandardCharsets.UTF_8))
134+
135+
val str = showString(spark.sql("select * from t order by 1"), 1000, 0)
136+
Files.write(Paths.get(rawCoverageFilePath), str.getBytes(StandardCharsets.UTF_8))
137+
}
138+
}
139+
140+
case class CoverageResult(result: String, details: Seq[(String, String)])
141+
142+
object CoverageResultStatus extends Enumeration {
143+
type CoverageResultStatus = Value
144+
145+
val Failed: Value = Value("FAILED")
146+
val Passed: Value = Value("PASSED")
147+
val Skipped: Value = Value("SKIPPED")
148+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919

2020
package org.apache.comet
2121

22-
import java.util
23-
2422
import org.apache.hadoop.fs.Path
2523
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
2624
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2725
import org.apache.spark.sql.functions.expr
2826
import org.apache.spark.sql.internal.SQLConf
2927
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
30-
import org.apache.spark.sql.types.{Decimal, DecimalType, StructType}
28+
import org.apache.spark.sql.types.{Decimal, DecimalType}
3129

3230
import org.apache.comet.CometSparkSessionExtensions.{isSpark32, isSpark33Plus, isSpark34Plus}
3331

@@ -1291,30 +1289,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
12911289
}
12921290
}
12931291

1294-
// tests one liner query without necessity to create external table
1295-
def testSingleLineQuery(
1296-
prepareQuery: String,
1297-
testQuery: String,
1298-
testName: String = "test",
1299-
tableName: String = "tbl"): Unit = {
1300-
1301-
withTempDir { dir =>
1302-
val path = new Path(dir.toURI.toString, testName).toUri.toString
1303-
var data: java.util.List[Row] = new util.ArrayList()
1304-
var schema: StructType = null
1305-
1306-
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
1307-
val df = spark.sql(prepareQuery)
1308-
data = df.collectAsList()
1309-
schema = df.schema
1310-
}
1311-
1312-
spark.createDataFrame(data, schema).repartition(1).write.parquet(path)
1313-
readParquetFile(path, Some(schema)) { df => df.createOrReplaceTempView(tableName) }
1314-
checkSparkAnswerAndOperator(testQuery)
1315-
}
1316-
}
1317-
13181292
test("Decimal random number tests") {
13191293
val rand = scala.util.Random
13201294
def makeNum(p: Int, s: Int): String = {

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,4 +729,42 @@ abstract class CometTestBase
729729
Seq.empty
730730
}
731731
}
732+
733+
// tests one liner query without necessity to create external table
734+
def testSingleLineQuery(
735+
prepareQuery: String,
736+
testQuery: String,
737+
testName: String = "test",
738+
tableName: String = "tbl",
739+
excludedOptimizerRules: Option[String] = None): Unit = {
740+
741+
withTempDir { dir =>
742+
val path = new Path(dir.toURI.toString, testName).toUri.toString
743+
var data: java.util.List[Row] = new java.util.ArrayList()
744+
var schema: StructType = null
745+
746+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
747+
val df = spark.sql(prepareQuery)
748+
data = df.collectAsList()
749+
schema = df.schema
750+
}
751+
752+
spark.createDataFrame(data, schema).repartition(1).write.parquet(path)
753+
readParquetFile(path, Some(schema)) { df => df.createOrReplaceTempView(tableName) }
754+
755+
withSQLConf(
756+
"spark.sql.optimizer.excludedRules" -> excludedOptimizerRules.getOrElse(""),
757+
"spark.sql.adaptive.optimizer.excludedRules" -> excludedOptimizerRules.getOrElse("")) {
758+
checkSparkAnswerAndOperator(sql(testQuery))
759+
}
760+
}
761+
}
762+
763+
def showString[T](
764+
df: Dataset[T],
765+
_numRows: Int,
766+
truncate: Int = 20,
767+
vertical: Boolean = false): String = {
768+
df.showString(_numRows, truncate, vertical)
769+
}
732770
}

0 commit comments

Comments
 (0)