Skip to content

Commit 3b1674c

Browse files
MaxGekkdongjoon-hyun
authored andcommitted
[SPARK-29313][SQL] Fix failure on writing to noop in benchmarks
### What changes were proposed in this pull request? In the PR, I propose to specify the save mode explicitly while writing to the `noop` datasource in benchmarks. I set `Overwrite` mode in the following benchmarks: - JsonBenchmark - CSVBenchmark - UDFBenchmark - MakeDateTimeBenchmark - ExtractBenchmark - DateTimeBenchmark - NestedSchemaPruningBenchmark ### Why are the changes needed? Otherwise writing to `noop` fails with: ``` [error] Exception in thread "main" org.apache.spark.sql.AnalysisException: TableProvider implementation noop cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.; [error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:284) ``` most likely due to #25876 ### Does this PR introduce any user-facing change? No ### How was this patch tested? I generated results of `ExtractBenchmark` via the command: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.ExtractBenchmark" ``` Closes #25988 from MaxGekk/noop-overwrite-mode. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent c6938ea commit 3b1674c

File tree

7 files changed

+55
-14
lines changed

7 files changed

+55
-14
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
2020
import java.sql.Timestamp
2121

2222
import org.apache.spark.benchmark.Benchmark
23+
import org.apache.spark.sql.SaveMode.Overwrite
2324
import org.apache.spark.sql.internal.SQLConf
2425

2526
/**
@@ -36,7 +37,12 @@ import org.apache.spark.sql.internal.SQLConf
3637
*/
3738
object DateTimeBenchmark extends SqlBasedBenchmark {
3839
private def doBenchmark(cardinality: Int, exprs: String*): Unit = {
39-
spark.range(cardinality).selectExpr(exprs: _*).write.format("noop").save()
40+
spark.range(cardinality)
41+
.selectExpr(exprs: _*)
42+
.write
43+
.format("noop")
44+
.mode(Overwrite)
45+
.save()
4046
}
4147

4248
private def run(cardinality: Int, name: String, exprs: String*): Unit = {
@@ -132,7 +138,10 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
132138
benchmark.addCase("From java.sql.Timestamp", numIters) { _ =>
133139
spark.range(rowsNum)
134140
.map(millis => new Timestamp(millis))
135-
.write.format("noop").save()
141+
.write
142+
.format("noop")
143+
.mode(Overwrite)
144+
.save()
136145
}
137146
benchmark.addCase("Collect longs", numIters) { _ =>
138147
spark.range(0, rowsNum, 1, 1)

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
2020
import java.time.Instant
2121

2222
import org.apache.spark.benchmark.Benchmark
23+
import org.apache.spark.sql.SaveMode.Overwrite
2324
import org.apache.spark.sql.internal.SQLConf
2425

2526
/**
@@ -44,6 +45,7 @@ object ExtractBenchmark extends SqlBasedBenchmark {
4445
.selectExpr(exprs: _*)
4546
.write
4647
.format("noop")
48+
.mode(Overwrite)
4749
.save()
4850
}
4951
}

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MakeDateTimeBenchmark.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.benchmark
1919

2020
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.sql.SaveMode.Overwrite
2122
import org.apache.spark.sql.internal.SQLConf
2223

2324
/**
@@ -41,6 +42,7 @@ object MakeDateTimeBenchmark extends SqlBasedBenchmark {
4142
.selectExpr(exprs: _*)
4243
.write
4344
.format("noop")
45+
.mode(Overwrite)
4446
.save()
4547
}
4648
}

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.benchmark
1919

2020
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.sql.SaveMode.Overwrite
2122
import org.apache.spark.sql.internal.SQLConf
2223

2324
/**
@@ -47,7 +48,11 @@ abstract class NestedSchemaPruningBenchmark extends SqlBasedBenchmark {
4748

4849
private def addCase(benchmark: Benchmark, name: String, sql: String): Unit = {
4950
benchmark.addCase(name) { _ =>
50-
spark.sql(sql).write.format("noop").save()
51+
spark.sql(sql)
52+
.write
53+
.format("noop")
54+
.mode(Overwrite)
55+
.save()
5156
}
5257
}
5358

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.benchmark
1919

2020
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.sql.SaveMode.Overwrite
2122
import org.apache.spark.sql.catalyst.expressions.Literal
2223
import org.apache.spark.sql.expressions.UserDefinedFunction
2324
import org.apache.spark.sql.functions._
@@ -42,16 +43,25 @@ object UDFBenchmark extends SqlBasedBenchmark {
4243
val nullableIntCol = when(
4344
idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType))
4445
val stringCol = idCol.cast(StringType)
45-
spark.range(cardinality).select(
46-
udf(idCol, nullableIntCol, stringCol)).write.format("noop").save()
46+
spark.range(cardinality)
47+
.select(udf(idCol, nullableIntCol, stringCol))
48+
.write
49+
.format("noop")
50+
.mode(Overwrite)
51+
.save()
4752
}
4853

4954
private def doRunBenchmarkWithPrimitiveTypes(
5055
udf: UserDefinedFunction, cardinality: Int): Unit = {
5156
val idCol = col("id")
5257
val nullableIntCol = when(
5358
idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType))
54-
spark.range(cardinality).select(udf(idCol, nullableIntCol)).write.format("noop").save()
59+
spark.range(cardinality)
60+
.select(udf(idCol, nullableIntCol))
61+
.write
62+
.format("noop")
63+
.mode(Overwrite)
64+
.save()
5565
}
5666

5767
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
@@ -104,16 +114,25 @@ object UDFBenchmark extends SqlBasedBenchmark {
104114
val benchmark = new Benchmark("UDF identity overhead", cardinality, output = output)
105115

106116
benchmark.addCase(s"Baseline", numIters = 5) { _ =>
107-
spark.range(cardinality).select(
108-
col("id"), col("id") * 2, col("id") * 3).write.format("noop").save()
117+
spark.range(cardinality)
118+
.select(col("id"), col("id") * 2, col("id") * 3)
119+
.write
120+
.format("noop")
121+
.mode(Overwrite)
122+
.save()
109123
}
110124

111125
val identityUDF = udf { x: Long => x }
112126
benchmark.addCase(s"With identity UDF", numIters = 5) { _ =>
113-
spark.range(cardinality).select(
114-
identityUDF(col("id")),
115-
identityUDF(col("id") * 2),
116-
identityUDF(col("id") * 3)).write.format("noop").save()
127+
spark.range(cardinality)
128+
.select(
129+
identityUDF(col("id")),
130+
identityUDF(col("id") * 2),
131+
identityUDF(col("id") * 3))
132+
.write
133+
.format("noop")
134+
.mode(Overwrite)
135+
.save()
117136
}
118137

119138
benchmark.run()

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate}
2121

2222
import org.apache.spark.benchmark.Benchmark
2323
import org.apache.spark.sql.{Column, Dataset, Row}
24+
import org.apache.spark.sql.SaveMode.Overwrite
2425
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
2526
import org.apache.spark.sql.functions._
2627
import org.apache.spark.sql.types._
@@ -42,7 +43,9 @@ import org.apache.spark.sql.types._
4243
object CSVBenchmark extends SqlBasedBenchmark {
4344
import spark.implicits._
4445

45-
private def toNoop(ds: Dataset[_]): Unit = ds.write.format("noop").save()
46+
private def toNoop(ds: Dataset[_]): Unit = {
47+
ds.write.format("noop").mode(Overwrite).save()
48+
}
4649

4750
private def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
4851
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate}
2121

2222
import org.apache.spark.benchmark.Benchmark
2323
import org.apache.spark.sql.{Dataset, Row}
24+
import org.apache.spark.sql.SaveMode.Overwrite
2425
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
2526
import org.apache.spark.sql.functions._
2627
import org.apache.spark.sql.types._
@@ -49,7 +50,7 @@ object JSONBenchmark extends SqlBasedBenchmark {
4950
}
5051

5152
private def run(ds: Dataset[_]): Unit = {
52-
ds.write.format("noop").save()
53+
ds.write.format("noop").mode(Overwrite).save()
5354
}
5455

5556
def schemaInferring(rowsNum: Int, numIters: Int): Unit = {

0 commit comments

Comments
 (0)