Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable remaining Spark 3.5.1 tests #676

Merged
merged 8 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ object CometConf extends ShimCometConf {
"why a query stage cannot be executed natively. Set this to false to " +
"reduce the amount of logging.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging was too verbose so I changed this back to be off by default


val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
Expand Down
83 changes: 5 additions & 78 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 746f289c393..bc01ffd52ea 100644
index 746f289c393..7d7a39ba82b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions
Expand Down Expand Up @@ -2120,29 +2120,17 @@ index 746f289c393..bc01ffd52ea 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -1013,7 +1039,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}

- test("bucket coalescing is applied when join expressions match with partitioning expressions") {
+ // https://github.com/apache/datafusion-comet/issues/617
+ test("bucket coalescing is applied when join expressions match with partitioning expressions",
+ IgnoreComet("TODO: fix Comet for this test")) {
withTable("t1", "t2", "t3") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
@@ -1029,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -1029,15 +1055,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
- val shuffles = collect(plan) { case s: ShuffleExchangeExec => s }
+ val shuffles = collect(plan) {
+ case s: ShuffleExchangeLike => s
+ }
+ case s: ShuffleExchangeLike => s
+ }
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
assert(shuffles.length == expectedNumShuffles)

- val scans = collect(plan) {
+ val scans = plan.collect {
val scans = collect(plan) {
case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f
+ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b
}
Expand Down Expand Up @@ -2379,67 +2367,6 @@ index b4c4ec7acbf..20579284856 100644
}

val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 3e1bc57dfa2..662640af934 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -28,10 +28,7 @@ import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter

import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row, SparkSession}
import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper}
import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStore, StateStoreProviderId}
import org.apache.spark.sql.functions._
@@ -594,40 +591,10 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
CheckNewAnswer((5, 10, 5, 15, 5, 25)))
}

- test("streaming join should require StatefulOpClusteredDistribution from children") {
- val input1 = MemoryStream[Int]
- val input2 = MemoryStream[Int]
-
- val df1 = input1.toDF
- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b"))
- val df2 = input2.toDF
- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b"))
- .repartition($"b")
- val joined = df1.join(df2, Seq("a", "b")).select($"a")
-
- testStream(joined)(
- AddData(input1, 1.to(1000): _*),
- AddData(input2, 1.to(1000): _*),
- CheckAnswer(1.to(1000): _*),
- Execute { query =>
- // Verify the query plan
- def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = {
- expressions.flatMap {
- case ref: AttributeReference => Some(ref.name)
- }
- }
-
- val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
-
- assert(query.lastExecution.executedPlan.collect {
- case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
- ShuffleExchangeExec(opA: HashPartitioning, _, _, _),
- ShuffleExchangeExec(opB: HashPartitioning, _, _, _))
- if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
- && partitionExpressionsColumns(opB.expressions) === Seq("a", "b")
- && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j
- }.size == 1)
- })
+ // https://github.com/apache/datafusion-comet/issues/617
+ test("streaming join should require StatefulOpClusteredDistribution from children",
+ IgnoreComet("TODO: fix Comet for this test")) {
+ fail("TODO fix diff")
}

test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..2d930b64cca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | true |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Comet currently supports the following versions of Apache Spark:

- 3.3.x
- 3.4.x
- 3.5.x

Experimental support is provided for the following versions of Apache Spark and is intended for development/testing
use only and should not be used in production yet.

- 3.5.x
- 4.0.0-preview1

Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by
Expand Down
Loading