Skip to content

Commit

Permalink
spark compile version -> runtime version
Browse files Browse the repository at this point in the history
  • Loading branch information
cfmcgrady committed Feb 24, 2023
1 parent a8e7b74 commit 3c9e300
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.{HiveMetadataTests, SparkQueryTests}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.SparkVersionUtil.isSparkVersionAtLeast

class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with SparkQueryTests {

Expand Down Expand Up @@ -93,12 +92,12 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
.add("c17", "struct<X: string>", nullable = true, "17")

// since spark3.3.0
if (SPARK_ENGINE_VERSION >= "3.3") {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
schema = schema.add("c18", "interval day", nullable = true, "18")
.add("c19", "interval year", nullable = true, "19")
}
// since spark3.4.0
if (SPARK_ENGINE_VERSION >= "3.4") {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.4") {
schema = schema.add("c20", "timestamp_ntz", nullable = true, "20")
}

Expand Down Expand Up @@ -511,7 +510,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
val status = tOpenSessionResp.getStatus
val errorMessage = status.getErrorMessage
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
if (isSparkVersionAtLeast("3.4")) {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.4") {
assert(errorMessage.contains("[SCHEMA_NOT_FOUND]"))
assert(errorMessage.contains(s"The schema `$dbName` cannot be found."))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.IcebergSuiteMixin
import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.SparkVersionUtil.isSparkVersionAtLeast
import org.apache.kyuubi.util.SparkVersionUtil

trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin {
trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin with SparkVersionUtil {

test("get catalogs") {
withJdbcStatement() { statement =>
Expand Down Expand Up @@ -153,11 +153,11 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin {
"date",
"timestamp",
// SPARK-37931
if (isSparkVersionAtLeast("3.3")) "struct<X: bigint, Y: double>"
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct<X: bigint, Y: double>"
else "struct<`X`: bigint, `Y`: double>",
"binary",
// SPARK-37931
if (isSparkVersionAtLeast("3.3")) "struct<X: string>" else "struct<`X`: string>")
if (SPARK_COMPILE_VERSION >= "3.3") "struct<X: string>" else "struct<`X`: string>")
val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt }
val (colNames, _) = cols.unzip

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.kyuubi.operation
import java.sql.{Date, Timestamp}

import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.util.SparkVersionUtil

trait SparkDataTypeTests extends HiveJDBCTestHelper {
protected lazy val SPARK_ENGINE_VERSION = sparkEngineMajorMinorVersion
trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil {

def resultFormat: String = "thrift"

test("execute statement - select null") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT NULL AS col")
assert(resultSet.next())
Expand Down Expand Up @@ -199,7 +200,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select timestamp_ntz") {
assume(SPARK_ENGINE_VERSION >= "3.4")
assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.4")
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.8888) AS col")
Expand All @@ -213,7 +214,8 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select daytime interval") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"interval 1 day 1 hour -60 minutes 30 seconds" ->
Expand Down Expand Up @@ -242,7 +244,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
if (SPARK_ENGINE_VERSION < "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
// for spark 3.1 and backwards
assert(result === kv._2._2)
assert(metaData.getPrecision(1) === Int.MaxValue)
Expand All @@ -258,7 +260,8 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select year/month interval") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"),
Expand All @@ -271,7 +274,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
if (SPARK_ENGINE_VERSION < "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
// for spark 3.1 and backwards
assert(result === kv._2._2)
assert(metaData.getPrecision(1) === Int.MaxValue)
Expand All @@ -287,7 +290,8 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select array") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT array() AS col1, array(1) AS col2, array(null) AS col3")
Expand All @@ -305,7 +309,8 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select map") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3")
Expand All @@ -323,7 +328,8 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}

test("execute statement - select struct") {
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(
resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_RUNTIME_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT struct('1', '2') AS col1," +
Expand All @@ -342,15 +348,4 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
assert(metaData.getScale(2) == 0)
}
}

def sparkEngineMajorMinorVersion: SemanticVersion = {
var sparkRuntimeVer = ""
withJdbcStatement() { stmt =>
val result = stmt.executeQuery("SELECT version()")
assert(result.next())
sparkRuntimeVer = result.getString(1)
assert(!result.next())
}
SemanticVersion(sparkRuntimeVer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsRe

import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.util.SparkVersionUtil.isSparkVersionAtLeast

trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {

Expand Down Expand Up @@ -187,7 +186,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
withJdbcStatement("t") { statement =>
try {
val assertTableOrViewNotfound: (Exception, String) => Unit = (e, tableName) => {
if (isSparkVersionAtLeast("3.4")) {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.4") {
assert(e.getMessage.contains("[TABLE_OR_VIEW_NOT_FOUND]"))
assert(e.getMessage.contains(s"The table or view `$tableName` cannot be found."))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.kyuubi.util

import org.apache.kyuubi.SPARK_COMPILE_VERSION
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.operation.HiveJDBCTestHelper

object SparkVersionUtil {
lazy val sparkSemanticVersion: SemanticVersion = SemanticVersion(SPARK_COMPILE_VERSION)
trait SparkVersionUtil {
this: HiveJDBCTestHelper =>

def isSparkVersionAtLeast(ver: String): Boolean = {
sparkSemanticVersion.isVersionAtLeast(ver)
protected lazy val SPARK_ENGINE_RUNTIME_VERSION = sparkEngineMajorMinorVersion

def sparkEngineMajorMinorVersion: SemanticVersion = {
var sparkRuntimeVer = ""
withJdbcStatement() { stmt =>
val result = stmt.executeQuery("SELECT version()")
assert(result.next())
sparkRuntimeVer = result.getString(1)
assert(!result.next())
}
SemanticVersion(sparkRuntimeVer)
}
}

0 comments on commit 3c9e300

Please sign in to comment.