-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-30409][SPARK-29173][SQL][TESTS] Use NoOp
datasource in SQL benchmarks
#27078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me if that's the best way to execute without doing anything else
Test build #116042 has finished for PR 27078 at commit
|
NoOp datasource just materializes rows and doesn't do anything else. |
Should we update the benchmark results too? |
I can run all SQL benchmarks locally on Mac OS or I can launch an EC2 instance with Linux (not sure how much will it cost to me). My concerns are only:
|
NoOp
datasource in SQL benchmarksNoOp
datasource in SQL benchmarks
Here is TPCDS result, @MaxGekk . |
Test build #116511 has finished for PR 27078 at commit
|
@dongjoon-hyun oh, sorry I missed your second PR. |
Thank you. No problem, @MaxGekk . What I want to say is that this |
I think it makes sense to open separate JIRAs per each case as we did before? |
Join w long wholestage off 4685 4814 182 4.5 223.4 1.0X | ||
Join w long wholestage on 440 524 102 47.7 21.0 10.7X | ||
Join w long wholestage off 4531 4557 37 4.6 216.1 1.0X | ||
Join w long wholestage on 1214 1310 95 17.3 57.9 3.7X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk . For example, this case, this slowed down 3x times in both JDK8 and JDK11. Also, it's mostly consistent in the other cases in this suite. It's a huge regression which is orthogonal to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you file a JIRA issue for this JoinBenchmark
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will do that tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-30563
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master. Thank you, @MaxGekk and all.
…dk11-results.txt ### What changes were proposed in this pull request? This PR removes a dangling test result, `JSONBenchmark-jdk11-results.txt`. This causes a case-sensitive issue on Mac. ``` $ git clone https://gitbox.apache.org/repos/asf/spark.git spark-gitbox Cloning into 'spark-gitbox'... remote: Counting objects: 671717, done. remote: Compressing objects: 100% (258021/258021), done. remote: Total 671717 (delta 329181), reused 560390 (delta 228097) Receiving objects: 100% (671717/671717), 149.69 MiB | 950.00 KiB/s, done. Resolving deltas: 100% (329181/329181), done. Updating files: 100% (16090/16090), done. warning: the following paths have collided (e.g. case-sensitive paths on a case-insensitive filesystem) and only one from the same colliding group is in the working tree: 'sql/core/benchmarks/JSONBenchmark-jdk11-results.txt' 'sql/core/benchmarks/JsonBenchmark-jdk11-results.txt' ``` ### Why are the changes needed? Previously, since the file name didn't match with `object JSONBenchmark`, it made a confusion when we ran the benchmark. So, 4e0e4e5 renamed `JSONBenchmark` to `JsonBenchmark`. However, at the same time frame, #26003 regenerated this file. Recently, #27078 regenerates the results with the correct file name, `JsonBenchmark-jdk11-results.txt`. So, we can remove the old one. ### Does this PR introduce any user-facing change? No. This is a test result. ### How was this patch tested? Manually check the following correctly generated files in the master. And, check this PR removes the dangling one. - https://github.com/apache/spark/blob/master/sql/core/benchmarks/JsonBenchmark-results.txt - https://github.com/apache/spark/blob/master/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt Closes #27180 from dongjoon-hyun/SPARK-REMOVE. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Test build #116569 has finished for PR 27078 at commit
|
10 units w/ interval 3041 3060 16 0.3 3041.3 0.2X | ||
10 units w/o interval 3031 3043 15 0.3 3031.2 0.2X | ||
11 units w/ interval 3270 3280 9 0.3 3269.9 0.2X | ||
11 units w/o interval 3273 3280 7 0.3 3272.6 0.2X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regression by 66%. Here is the JIRA for investigation: https://issues.apache.org/jira/browse/SPARK-30562
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, regression vs what here... the change should have made the benchmarks faster, but they were also re-run. Is it the new instance type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few commits in interval implementation between 2 re-runs. The regression can be related to those commits but to my changes.
Is it the new instance type?
It is the same.
Parsing with UTF-8 120085 121975 1705 0.4 2401.7 0.1X | ||
Text read 10249 10314 77 4.9 205.0 1.0X | ||
Schema inferring 35403 35436 40 1.4 708.1 0.3X | ||
Parsing without charset 32875 32879 4 1.5 657.5 0.3X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This became faster almost up to 3 times
------------------------------------------------------------------------------------------------------------------------ | ||
1 select expressions 5 13 8 0.0 5370143.0 1.0X | ||
100 select expressions 12 16 6 0.0 11995425.0 0.4X | ||
2500 select expressions 211 214 4 0.0 210927791.0 0.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here there is slow down ~2 times. I opened JIRA for investigations: https://issues.apache.org/jira/browse/SPARK-30564
100 deep x 1000 rows (exec in-mem) 1283 1283 0 0.1 12830.5 0.0X | ||
100 deep x 1000 rows (read parquet) 1201 1205 7 0.1 12005.2 0.0X | ||
100 deep x 1000 rows (write parquet) 436 443 9 0.2 4361.4 0.1X | ||
250 deep x 400 rows (read in-mem) 1882 1883 1 0.1 18819.9 0.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regression ~8 times, https://issues.apache.org/jira/browse/SPARK-30564
Hive built-in ORC 520 531 8 2.0 495.8 0.6X | ||
Native ORC MR 250 264 15 4.2 238.1 1.0X | ||
Native ORC Vectorized 121 138 24 8.7 115.5 2.1X | ||
Hive built-in ORC 1761 1792 43 0.6 1679.3 0.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here there is a regression ~3 times. https://issues.apache.org/jira/browse/SPARK-30565
Thank you for the follow-up, @MaxGekk . |
### What changes were proposed in this pull request? `BroadcastNestedLoopJoinExec` does not have code-gen, and we can potentially boost the CPU performance for this operator if we add code-gen for it. https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html also showed the evidence in one fork. The codegen for `BroadcastNestedLoopJoinExec` shared some code with `HashJoin`, and the interface `JoinCodegenSupport` is created to hold those common logic. This PR is only supporting inner and cross join. Other join types will be added later in followup PRs. Example query and generated code: ``` val df1 = spark.range(4).select($"id".as("k1")) val df2 = spark.range(3).select($"id".as("k2")) df1.join(df2, $"k1" + 1 =!= $"k2").explain("codegen") ``` ``` == Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) == *(2) BroadcastNestedLoopJoin BuildRight, Inner, NOT ((k1#2L + 1) = k2#6L) :- *(2) Project [id#0L AS k1#2L] : +- *(2) Range (0, 4, step=1, splits=2) +- BroadcastExchange IdentityBroadcastMode, [id=#22] +- *(1) Project [id#4L AS k2#6L] +- *(1) Range (0, 3, step=1, splits=2) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean range_initRange_0; /* 010 */ private long range_nextIndex_0; /* 011 */ private TaskContext range_taskContext_0; /* 012 */ private InputMetrics range_inputMetrics_0; /* 013 */ private long range_batchEnd_0; /* 014 */ private long range_numElementsTodo_0; /* 015 */ private InternalRow[] bnlj_buildRowArray_0; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ /* 026 */ range_taskContext_0 = TaskContext.get(); /* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value(); /* 032 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 033 */ /* 034 */ } /* 035 */ /* 036 */ private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException { /* 037 */ for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) { /* 038 */ UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0]; /* 039 */ /* 040 */ long bnlj_value_1 = bnlj_buildRow_0.getLong(0); /* 041 */ /* 042 */ long bnlj_value_4 = -1L; /* 043 */ /* 044 */ bnlj_value_4 = bnlj_expr_0_0 + 1L; /* 045 */ /* 046 */ boolean bnlj_value_3 = false; /* 047 */ bnlj_value_3 = bnlj_value_4 == bnlj_value_1; /* 048 */ boolean bnlj_value_2 = false; /* 049 */ bnlj_value_2 = !(bnlj_value_3); /* 050 */ if (!(false || !bnlj_value_2)) /* 051 */ { /* 052 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 053 */ /* 054 */ range_mutableStateArray_0[3].reset(); /* 055 */ /* 056 */ range_mutableStateArray_0[3].write(0, bnlj_expr_0_0); /* 057 */ /* 058 */ range_mutableStateArray_0[3].write(1, bnlj_value_1); /* 059 */ append((range_mutableStateArray_0[3].getRow()).copy()); /* 060 */ /* 061 */ } /* 062 */ } /* 063 */ /* 064 */ } /* 065 */ /* 066 */ private void initRange(int idx) { /* 067 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 068 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 069 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L); /* 070 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 071 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 072 */ long partitionEnd; /* 073 */ /* 074 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 075 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 076 */ range_nextIndex_0 = Long.MAX_VALUE; /* 077 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 078 */ range_nextIndex_0 = Long.MIN_VALUE; /* 079 */ } else { /* 080 */ range_nextIndex_0 = st.longValue(); /* 081 */ } /* 082 */ range_batchEnd_0 = range_nextIndex_0; /* 083 */ /* 084 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 085 */ .multiply(step).add(start); /* 086 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 087 */ partitionEnd = Long.MAX_VALUE; /* 088 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 089 */ partitionEnd = Long.MIN_VALUE; /* 090 */ } else { /* 091 */ partitionEnd = end.longValue(); /* 092 */ } /* 093 */ /* 094 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 095 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 096 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 097 */ if (range_numElementsTodo_0 < 0) { /* 098 */ range_numElementsTodo_0 = 0; /* 099 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 100 */ range_numElementsTodo_0++; /* 101 */ } /* 102 */ } /* 103 */ /* 104 */ protected void processNext() throws java.io.IOException { /* 105 */ // initialize Range /* 106 */ if (!range_initRange_0) { /* 107 */ range_initRange_0 = true; /* 108 */ initRange(partitionIndex); /* 109 */ } /* 110 */ /* 111 */ while (true) { /* 112 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 113 */ long range_nextBatchTodo_0; /* 114 */ if (range_numElementsTodo_0 > 1000L) { /* 115 */ range_nextBatchTodo_0 = 1000L; /* 116 */ range_numElementsTodo_0 -= 1000L; /* 117 */ } else { /* 118 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 119 */ range_numElementsTodo_0 = 0; /* 120 */ if (range_nextBatchTodo_0 == 0) break; /* 121 */ } /* 122 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 123 */ } /* 124 */ /* 125 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 126 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 127 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 128 */ /* 129 */ // common sub-expressions /* 130 */ /* 131 */ bnlj_doConsume_0(range_value_0); /* 132 */ /* 133 */ if (shouldStop()) { /* 134 */ range_nextIndex_0 = range_value_0 + 1L; /* 135 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1); /* 136 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1); /* 137 */ return; /* 138 */ } /* 139 */ /* 140 */ } /* 141 */ range_nextIndex_0 = range_batchEnd_0; /* 142 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 143 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 144 */ range_taskContext_0.killTaskIfInterrupted(); /* 145 */ } /* 146 */ } /* 147 */ /* 148 */ } ``` ### Why are the changes needed? Improve query CPU performance. Added a micro benchmark query in `JoinBenchmark.scala`. Saw 1x of run time improvement: ``` OpenJDK 64-Bit Server VM 11.0.9+11-LTS on Linux 4.14.219-161.340.amzn2.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 2.50GHz broadcast nested loop join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- broadcast nested loop join wholestage off 62922 63052 184 0.3 3000.3 1.0X broadcast nested loop join wholestage on 30946 30972 26 0.7 1475.6 2.0X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? * Added unit test in `WholeStageCodegenSuite.scala`, and existing unit tests for `BroadcastNestedLoopJoinExec`. * Updated golden files for several TCPDS query plans, as whole stage code-gen for `BroadcastNestedLoopJoinExec` is triggered. * Updated `JoinBenchmark-jdk11-results.txt ` and `JoinBenchmark-results.txt` with new benchmark result. Followed previous benchmark PRs - #27078 and #26003 to use same type of machine: ``` Amazon AWS EC2 type: r3.xlarge region: us-west-2 (Oregon) OS: Linux ``` Closes #31736 from c21/nested-join-exec. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In the PR, I propose to replace
.collect()
,.count()
and.foreach(_ => ())
in SQL benchmarks and use theNoOp
datasource. I added an implicit class toSqlBasedBenchmark
with the.noop()
method. It can be used in benchmark like:ds.noop()
. The last one is unfolded tods.write.format("noop").mode(Overwrite).save()
.Why are the changes needed?
To avoid additional overhead that
collect()
(and other actions) has. For example,.collect()
has to convert values according to external types and pull data to the driver. This can hide actual performance regressions or improvements of benchmarked operations.Does this PR introduce any user-facing change?
No
How was this patch tested?
Re-run all modified benchmarks using Amazon EC2.
TPCDSQueryBenchmark
using instructions from the PR [SPARK-25668][SQL][TESTS] Refactor TPCDSQueryBenchmark to use main method #26049