Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/java-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ inputs:
scan_impl:
description: 'The default Parquet scan implementation'
required: false
default: 'native_comet'
default: 'auto'
upload-test-reports:
description: 'Whether to upload test results including coverage to GitHub'
required: false
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
- name: "Spark 3.4, JDK 11, Scala 2.12"
java_version: "11"
maven_opts: "-Pspark-3.4 -Pscala-2.12"
scan_impl: "native_comet"
scan_impl: "auto"

- name: "Spark 3.5.5, JDK 17, Scala 2.13"
java_version: "17"
Expand All @@ -174,7 +174,7 @@ jobs:
- name: "Spark 3.5.6, JDK 17, Scala 2.13"
java_version: "17"
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.6 -Pscala-2.13"
scan_impl: "native_comet"
scan_impl: "auto"

- name: "Spark 3.5, JDK 17, Scala 2.12"
java_version: "17"
Expand Down
5 changes: 1 addition & 4 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,12 @@ jobs:
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
# Test combinations:
# - auto scan: all Spark versions (3.4, 3.5, 4.0)
# - native_comet: Spark 3.4, 3.5
# - native_iceberg_compat: Spark 3.5 only
config:
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto', scan-env: ''}
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto', scan-env: ''}
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'native_comet', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_comet'}
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_comet', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_comet'}
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_iceberg_compat', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_iceberg_compat'}
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
exclude:
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
Expand Down
8 changes: 2 additions & 6 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,8 @@ object CometConf extends ShimCometConf {
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
.category(CATEGORY_SCAN)
.doc(
s"The implementation of Comet Native Scan to use. Available modes are `$SCAN_NATIVE_COMET`," +
"The implementation of Comet Native Scan to use. Available modes are " +
s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " +
s"`$SCAN_NATIVE_COMET` (DEPRECATED - will be removed in a future release) is for the " +
"original Comet native scan which uses a jvm based parquet file reader and native " +
"column decoding. Supports simple types only. " +
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " +
"DataFusion. " +
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " +
Expand All @@ -137,8 +134,7 @@ object CometConf extends ShimCometConf {
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)

val COMET_ICEBERG_NATIVE_ENABLED: ConfigEntry[Boolean] =
Expand Down
15 changes: 10 additions & 5 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("basic data type support") {
// ignored: native_comet scan is no longer supported
ignore("basic data type support") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down Expand Up @@ -216,7 +217,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("uint data type support") {
// ignored: native_comet scan is no longer supported
ignore("uint data type support") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down Expand Up @@ -1503,7 +1505,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("round") {
// ignored: native_comet scan is no longer supported
ignore("round") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down Expand Up @@ -1567,7 +1570,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("hex") {
// ignored: native_comet scan is no longer supported
ignore("hex") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down Expand Up @@ -2781,7 +2785,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("test integral divide") {
// ignored: native_comet scan is no longer supported
ignore("test integral divide") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ class CometExecSuite extends CometTestBase {
}
}

test("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2 scan") {
// ignored: native_comet scan is no longer supported
ignore("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2 scan") {
withSQLConf(
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

test("native write falls back when scan produces non-Arrow data") {
// ignored: native_comet scan is no longer supported
ignore("native write falls back when scan produces non-Arrow data") {
// This test verifies that when a native scan (like native_comet) doesn't support
// certain data types (complex types), the native write correctly falls back to Spark
// instead of failing at runtime with "Comet execution only takes Arrow Arrays" error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("unsupported Spark types") {
// ignored: native_comet scan is no longer supported
ignore("unsupported Spark types") {
// TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET
// https://github.com/apache/datafusion-comet/issues/2188
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Expand Down Expand Up @@ -130,7 +131,8 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("unsupported Spark schema") {
// ignored: native_comet scan is no longer supported
ignore("unsupported Spark schema") {
// TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET
// https://github.com/apache/datafusion-comet/issues/2188
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Expand Down Expand Up @@ -368,7 +370,8 @@ abstract class ParquetReadSuite extends CometTestBase {
checkParquetFile(data)
}

test("test multiple pages with different sizes and nulls") {
// ignored: native_comet scan is no longer supported
ignore("test multiple pages with different sizes and nulls") {
def makeRawParquetFile(
path: Path,
dictionaryEnabled: Boolean,
Expand Down Expand Up @@ -1344,7 +1347,8 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("scan metrics") {
// ignored: native_comet scan is no longer supported
ignore("scan metrics") {

val cometScanMetricNames = Seq(
"ParquetRowGroups",
Expand Down Expand Up @@ -1866,8 +1870,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {

test("Test V1 parquet scan uses respective scanner") {
Seq(
("false", CometConf.SCAN_NATIVE_COMET, "FileScan parquet"),
("true", CometConf.SCAN_NATIVE_COMET, "CometScan [native_comet] parquet"),
("false", CometConf.SCAN_NATIVE_DATAFUSION, "FileScan parquet"),
("true", CometConf.SCAN_NATIVE_DATAFUSION, "CometNativeScan"),
("true", CometConf.SCAN_NATIVE_ICEBERG_COMPAT, "CometScan [native_iceberg_compat] parquet"))
.foreach { case (cometEnabled, cometNativeScanImpl, expectedScanner) =>
Expand Down Expand Up @@ -2014,10 +2017,11 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {

}

// ignored: native_comet scan is no longer supported
class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*)(
super.ignore(testName, testTags: _*)(
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Expand All @@ -2040,7 +2044,8 @@ class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
}
}

test("Test V2 parquet scan uses respective scanner") {
// ignored: native_comet scan is no longer supported
ignore("Test V2 parquet scan uses respective scanner") {
Seq(("false", "BatchScan"), ("true", "CometBatchScan")).foreach {
case (cometEnabled, expectedScanner) =>
testScanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class CometScanRuleSuite extends CometTestBase {
}
}

test("CometScanRule should replace V2 BatchScanExec, but only when Comet is enabled") {
// ignored: native_comet scan is no longer supported
ignore("CometScanRule should replace V2 BatchScanExec, but only when Comet is enabled") {
withTempPath { path =>
createTestDataFrame.write.parquet(path.toString)
withTempView("test_data") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector

import org.apache.comet.{CometConf, WithHdfsCluster}
import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
import org.apache.comet.parquet.BatchReader

/**
Expand Down Expand Up @@ -67,14 +67,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql(s"select $query from parquetV1Table").noop()
}

sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql(s"select $query from parquetV1Table").noop()
}
}

sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -175,21 +167,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
}
}

sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
"spark.memory.offHeap.enabled" -> "true",
"spark.memory.offHeap.size" -> "10g",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET,
DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass,
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
s"footerKey: ${footerKey}, key1: ${key1}") {
spark.sql(s"select $query from parquetV1Table").noop()
}
}

sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
"spark.memory.offHeap.enabled" -> "true",
Expand Down Expand Up @@ -245,14 +222,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql("select sum(id) from parquetV1Table").noop()
}

sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql("select sum(id) from parquetV1Table").noop()
}
}

sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -373,14 +342,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
}

benchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
}
}

benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -431,14 +392,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql("select sum(length(id)) from parquetV1Table").noop()
}

sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql("select sum(length(id)) from parquetV1Table").noop()
}
}

sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -482,17 +435,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
.noop()
}

benchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
"not NULL and c2 is not NULL")
.noop()
}
}

benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -538,14 +480,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
}

benchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
}
}

benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -589,14 +523,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}

benchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
}

benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down Expand Up @@ -640,14 +566,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}

benchmark.addCase("SQL Parquet - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
}

benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down
Loading
Loading