-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions #23291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thank you for making the benchmark, @aokolnychyi . I'll take a look. |
Test build #99987 has finished for PR 23291 at commit
|
df.createOrReplaceTempView("t") | ||
|
||
def testClosure(): Unit = { | ||
val df = spark.sql(s"SELECT * FROM t WHERE id IN (${values mkString ","})") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
values mkString ","
-> values.mkString(",")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, will update that.
|
||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
val largeNumRows = 10000000 | ||
val smallNumRows = 1000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is smallNumRows
used simply in order to reduce the benchmark time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly, smallNumRows
is used with data types that take longer to process.
|
||
intBenchmark(numItems = 10, largeNumRows, minNumIters).run() | ||
intBenchmark(numItems = 50, largeNumRows, minNumIters).run() | ||
intBenchmark(numItems = 250, largeNumRows, minNumIters).run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, can we use a consistent set of numItems
if there is no other reason?
Here, we already used three different sets; byte
-> (10,50), short
-> (10, 100), int
-> (10, 50, 250).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about (10,50) for bytes/shorts and (10, 50, 250) for ints/longs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the default value of OPTIMIZER_INSET_CONVERSION_THRESHOLD
is 10, what about (10, 25, 50, 100, 200)
for all types? We had better collect more data points (if possible) because this is a benchmark and not a part of UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no objections w.r.t. to increasing the execution time, it would make sense to have more data points, I agree.
What if we follow the pattern (10, 25, 50, 100, 200) but stop as soon as InSet
starts to outperform In
? For example, InSet
starts to match the performance of In
on 50 bytes. If we run the test on 100 bytes, InSet
will be faster than In
. So, I propose to drop the case with 200 elements as the outcome is clear and it will just increase the benchmarking time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-determinism is not good for benchmark because everything(Spark/JVM/HW) will change. BTW, at the final step, I will run this benchmark on Amazon EC2 machine, too.
stop as soon as
InSet
starts to outperformIn
val name = s"$numItems ints" | ||
val values = 1 to numItems | ||
val df = spark.range(1, numRows).select($"id".cast(IntegerType)) | ||
benchmark(name, df, values, numRows, minNumIters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Shall we invoke run
here intead of intBenchmark(...).run()
? It seems to be repeated multiple times.
- benchmark(name, df, values, numRows, minNumIters)
+ benchmark(name, df, values, numRows, minNumIters).run()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this. Shall we rename intBenchmark
to runIntBenchmark
then? There is no consistency in existing benchmarks, unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for renaming. Yep. We overlooked the naming consistency in the previous benchmarks.
|
||
import spark.implicits._ | ||
|
||
def byteBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def
-> private def
Test build #99993 has finished for PR 23291 at commit
|
987bea4
to
1cbf93d
Compare
Test build #100016 has finished for PR 23291 at commit
|
Test build #100019 has finished for PR 23291 at commit
|
I believe the last failure is not related to this PR as other PRs failed with the same error. |
Right. Currently, we are retrying this in Jenkins, #23294 (comment) . |
Retest this please. |
Test build #100027 has finished for PR 23291 at commit
|
In this case, let's ignore the irrelevant SparkR failure for a while. |
Retest this please |
Test build #100099 has started for PR 23291 at commit |
Retest this please |
Test build #100113 has started for PR 23291 at commit |
Retest this please |
Test build #100153 has finished for PR 23291 at commit
|
@dongjoon-hyun could you, please, do another review round? |
runByteBenchmark(numItems = 10, largeNumRows, minNumIters) | ||
runByteBenchmark(numItems = 25, largeNumRows, minNumIters) | ||
runByteBenchmark(numItems = 50, largeNumRows, minNumIters) | ||
runByteBenchmark(numItems = 100, largeNumRows, minNumIters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we missed 200
only here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped this intentionally to avoid hitting overflow for byte values. We can change the way we generate bytes: (Byte.MinValue to Byte.MinValue + numItems).map(v => s"${v}Y")
. It just looks a bit more involved and I wanted to keep it consistent with other cases. I do not mind changing, though. Let me know your opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I got it. And, +1 for the new way starting from MinValue
.
|
||
private def runDoubleBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { | ||
val name = s"$numItems doubles" | ||
val values = 1.0 to numItems by 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FractionalProxy.to
is deprecated in Scala 2.12 and is already removed at Scala 2.13. Shall we avoid this syntax?
200 shorts: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
In expression 259 / 265 38.6 25.9 1.0X | ||
InSet expression 134 / 139 74.5 13.4 1.9X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good to compare In
and InSet
, but it seems to be difficult to see the trend. Let me think about this.
|
||
private def runByteBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { | ||
val name = s"$numItems bytes" | ||
val values = (1 to numItems).map(v => s"CAST($v AS tinyint)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like line 64, shall we use literal? Y
is the postfix for tinyint.
|
||
private def runShortBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { | ||
val name = s"$numItems shorts" | ||
val values = (1 to numItems).map(v => s"CAST($v AS smallint)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. S
is the postfix for small int literal.
val name = s"$numItems doubles" | ||
val values = 1.0 to numItems by 1.0 | ||
val df = spark.range(1, numRows).select($"id".cast(DoubleType)) | ||
runBenchmark(name, df, values, numRows, minNumIters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this will generate decimal
. Please note that 1.0
is decimal(2,1)
. For double literal, please use 1.0D
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @aokolnychyi . I left a few comments and runDoubleBenchmark
must be fixed.
runShortBenchmark(numItems = 25, largeNumRows, minNumIters) | ||
runShortBenchmark(numItems = 50, largeNumRows, minNumIters) | ||
runShortBenchmark(numItems = 100, largeNumRows, minNumIters) | ||
runShortBenchmark(numItems = 200, largeNumRows, minNumIters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, shorter is better.
Seq(5, 10, 25, 50, 100, 200).foreach { numItems =>
runShortBenchmark(numItems, largeNumRows, minNumIters)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this?
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val largeNumRows = 10000000
val smallNumRows = 1000000
val minNumIters = 5
runBenchmark("In Expression Benchmark") {
Seq(5, 10, 25, 50, 100, 200).foreach { numItems =>
runByteBenchmark(numItems, largeNumRows, minNumIters)
runShortBenchmark(numItems, largeNumRows, minNumIters)
runIntBenchmark(numItems, largeNumRows, minNumIters)
runLongBenchmark(numItems, largeNumRows, minNumIters)
runFloatBenchmark(numItems, largeNumRows, minNumIters)
runDoubleBenchmark(numItems, largeNumRows, minNumIters)
runSmallDecimalBenchmark(numItems, smallNumRows, minNumIters)
runLargeDecimalBenchmark(numItems, smallNumRows, minNumIters)
runStringBenchmark(numItems, smallNumRows, minNumIters)
runTimestampBenchmark(numItems, largeNumRows, minNumIters)
runDateBenchmark(numItems, largeNumRows, minNumIters)
runArrayBenchmark(numItems, smallNumRows, minNumIters)
runStructBenchmark(numItems, smallNumRows, minNumIters)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll be difficult to see the result. You need to go back and forward to see the single type trends.
Test build #100395 has finished for PR 23291 at commit
|
@dongjoon-hyun could you, please, do another pass? |
LGTM for me. Waiting for @dongjoon-hyun for another pass. |
Retest this please. |
Hi, @aokolnychyi . I rebased this PR and refresh the result on EC2 instance. I made a PR to you. Could you review and merge aokolnychyi#1 ? For the other things, it looks good to me. |
Test build #101116 has finished for PR 23291 at commit
|
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.functions.{array, struct} | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.{ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, ShortType, StringType, TimestampType} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just org.apache.spark.sql.types._
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
runLargeDecimalBenchmark(numItems, smallNumRows, minNumIters) | ||
} | ||
numItemsSeq.foreach { numItems => | ||
runStringBenchmark(numItems, smallNumRows, minNumIters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why some benchmarks use large num rows, others use small num rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done to shorten the execution time for benchmarks that take longer. The ratio stays identical.
private def runStructBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { | ||
val name = s"$numItems structs" | ||
val values = (1 to numItems).map(i => s"struct($i)") | ||
val df = spark.range(0, numRows).select(struct($"id".as("col1")).as("id")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The range of numItems
(max: 200) is relatively smaller than numRows
(max: 10000000). So most of time, In/InSet expressions can't find the value in given list. I think it is naturally bad for In expression's performance since it compares all values in the list.
Is this a fair performance comparison between In and InSet expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are interested in the worst case performance. I believe this benchmark is close to a real use case as most of the time we search for a relatively small set of values in a huge distributed data set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this description in the beginning comment of benchmark code? It is better to explain this is for worse case performance comparisons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not sure here. The main idea behind converting In
to InSet
is to avoid iterating through all elements. This benchmark mostly tests when this optimization works and when it makes things even worse. I tend to think that having In
expression on a real data set that exits after a few branches is rather an edge case. I can be convinced but if there is no strong community's opinion on this, I would keep it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya what about having this?
/**
* A benchmark that compares the performance of different ways to evaluate SQL IN expressions.
*
* Specifically, this class compares the if-based approach, which might iterate through all items
* inside the IN value list, to other options with better worst-case time complexities (e.g., sets).
*
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/InExpressionBenchmark-results.txt".
* }}}
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @aokolnychyi
EC2 result on Jan. 11th
@dongjoon-hyun thanks for the time to run this on EC2. |
Test build #101173 has finished for PR 23291 at commit
|
Test build #101171 has finished for PR 23291 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @aokolnychyi .
+1, LGTM. Merged to master.
Test build #101266 has finished for PR 23291 at commit
|
…sions ## What changes were proposed in this pull request? This PR contains benchmarks for `In` and `InSet` expressions. They cover literals of different data types and will help us to decide where to integrate the switch-based logic for bytes/shorts/ints. As discussed in [PR-23171](apache#23171), one potential approach is to convert `In` to `InSet` if all elements are literals independently of data types and the number of elements. According to the results of this PR, we might want to keep the threshold for the number of elements. The if-else approach approach might be faster for some data types on a small number of elements (structs? arrays? small decimals?). ### byte / short / int / long Unless the number of items is really big, `InSet` is slower than `In` because of autoboxing . Interestingly, `In` scales worse on bytes/shorts than on ints/longs. For example, `InSet` starts to match the performance on around 50 bytes/shorts while this does not happen on the same number of ints/longs. This is a bit strange as shorts/bytes (e.g., `(byte) 1`, `(short) 2`) are represented as ints in the bytecode. ### float / double Use cases on floats/doubles also suffer from autoboxing. Therefore, `In` outperforms `InSet` on 10 elements. Similarly to shorts/bytes, `In` scales worse on floats/doubles than on ints/longs because the equality condition is more complicated (e.g., `java.lang.Float.isNaN(filter_valueArg_0) && java.lang.Float.isNaN(9.0F)) || filter_valueArg_0 == 9.0F`). ### decimal The reason why we have separate benchmarks for small and large decimals is that Spark might use longs to represent decimals in some cases. If this optimization happens, then `equals` will be nothing else as comparing longs. If this does not happen, Spark will create an instance of `scala.BigDecimal` and use it for comparisons. The latter is more expensive. `Decimal$hashCode` will always use `scala.BigDecimal$hashCode` even if the number is small enough to fit into a long variable. As a consequence, we see that use cases on small decimals are faster with `In` as they are using long comparisons under the hood. Large decimal values are always faster with `InSet`. ### string `UTF8String$equals` is not cheap. Therefore, `In` does not really outperform `InSet` as in previous use cases. ### timestamp / date Under the hood, timestamp/date values will be represented as long/int values. So, `In` allows us to avoid autoboxing. ### array Arrays are working as expected. `In` is faster on 5 elements while `InSet` is faster on 15 elements. The benchmarks are using `UnsafeArrayData`. ### struct `InSet` is always faster than `In` for structs. These benchmarks use `GenericInternalRow`. Closes apache#23291 from aokolnychyi/spark-26203. Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR contains benchmarks for
In
andInSet
expressions. They cover literals of different data types and will help us to decide where to integrate the switch-based logic for bytes/shorts/ints.As discussed in PR-23171, one potential approach is to convert
In
toInSet
if all elements are literals independently of data types and the number of elements. According to the results of this PR, we might want to keep the threshold for the number of elements. The if-else approach approach might be faster for some data types on a small number of elements (structs? arrays? small decimals?).byte / short / int / long
Unless the number of items is really big,
InSet
is slower thanIn
because of autoboxing .Interestingly,
In
scales worse on bytes/shorts than on ints/longs. For example,InSet
starts to match the performance on around 50 bytes/shorts while this does not happen on the same number of ints/longs. This is a bit strange as shorts/bytes (e.g.,(byte) 1
,(short) 2
) are represented as ints in the bytecode.float / double
Use cases on floats/doubles also suffer from autoboxing. Therefore,
In
outperformsInSet
on 10 elements.Similarly to shorts/bytes,
In
scales worse on floats/doubles than on ints/longs because the equality condition is more complicated (e.g.,java.lang.Float.isNaN(filter_valueArg_0) && java.lang.Float.isNaN(9.0F)) || filter_valueArg_0 == 9.0F
).decimal
The reason why we have separate benchmarks for small and large decimals is that Spark might use longs to represent decimals in some cases.
If this optimization happens, then
equals
will be nothing else as comparing longs. If this does not happen, Spark will create an instance ofscala.BigDecimal
and use it for comparisons. The latter is more expensive.Decimal$hashCode
will always usescala.BigDecimal$hashCode
even if the number is small enough to fit into a long variable. As a consequence, we see that use cases on small decimals are faster withIn
as they are using long comparisons under the hood. Large decimal values are always faster withInSet
.string
UTF8String$equals
is not cheap. Therefore,In
does not really outperformInSet
as in previous use cases.timestamp / date
Under the hood, timestamp/date values will be represented as long/int values. So,
In
allows us to avoid autoboxing.array
Arrays are working as expected.
In
is faster on 5 elements whileInSet
is faster on 15 elements. The benchmarks are usingUnsafeArrayData
.struct
InSet
is always faster thanIn
for structs. These benchmarks useGenericInternalRow
.